You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/10/08 02:25:50 UTC
[28/50] [abbrv] kylin git commit: KYLIN-1962 reorg
BuildCubeWithStream test case
KYLIN-1962 reorg BuildCubeWithStream test case
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/859230d7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/859230d7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/859230d7
Branch: refs/heads/orderedbytes
Commit: 859230d787f9c218f56e56308897b68fb23d8dc4
Parents: ab5563a
Author: shaofengshi <sh...@apache.org>
Authored: Mon Sep 26 18:10:32 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 27 10:17:40 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 2 +-
.../java/org/apache/kylin/cube/CubeManager.java | 20 +++
.../java/org/apache/kylin/cube/CubeSegment.java | 7 +-
.../test_streaming_table_cube_desc.json | 17 ++-
.../test_streaming_table_model_desc.json | 3 +-
.../kylin/provision/BuildCubeWithStream.java | 121 +++++++++++------
.../kylin/provision/BuildCubeWithStream2.java | 134 -------------------
7 files changed, 116 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3e41055..838ef97 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -807,6 +807,6 @@ abstract public class KylinConfigBase implements Serializable {
}
public int getMaxBuildingSegments() {
- return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "1"));
+ return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "2"));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 463c8e9..962568c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -29,10 +29,12 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
@@ -476,6 +478,24 @@ public class CubeManager implements IRealizationProvider {
if (pair.getFirst() == false || pair.getSecond() == false)
throw new IllegalArgumentException("The new refreshing segment " + newSegment + " does not match any existing segment in cube " + cube);
+ if (startOffset > 0 || endOffset > 0) {
+ CubeSegment toRefreshSeg = null;
+ for (CubeSegment cubeSegment : cube.getSegments()) {
+ if (cubeSegment.getSourceOffsetStart() == startOffset && cubeSegment.getSourceOffsetEnd() == endOffset) {
+ toRefreshSeg = cubeSegment;
+ break;
+ }
+ }
+
+ if (toRefreshSeg == null) {
+ throw new IllegalArgumentException("For streaming cube, only one segment can be refreshed at one time");
+ }
+
+ Map<String, String> partitionInfo = Maps.newHashMap();
+ partitionInfo.putAll(toRefreshSeg.getAdditionalInfo());
+ newSegment.setAdditionalInfo(partitionInfo);
+ }
+
CubeUpdate cubeBuilder = new CubeUpdate(cube);
cubeBuilder.setToAddSegs(newSegment);
updateCube(cubeBuilder);
http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index afb0d28..d5de47f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -20,7 +20,6 @@ package org.apache.kylin.cube;
import java.text.SimpleDateFormat;
import java.util.Collection;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -106,7 +105,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen
@JsonProperty("additionalInfo")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
- private HashMap<String, String> additionalInfo = new LinkedHashMap<String, String>();
+ private Map<String, String> additionalInfo = new LinkedHashMap<String, String>();
private volatile Map<Long, Short> cuboidBaseShards = Maps.newHashMap();//cuboid id ==> base(starting) shard for this cuboid
@@ -543,11 +542,11 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen
this.indexPath = indexPath;
}
- public HashMap<String, String> getAdditionalInfo() {
+ public Map<String, String> getAdditionalInfo() {
return additionalInfo;
}
- public void setAdditionalInfo(HashMap<String, String> additionalInfo) {
+ public void setAdditionalInfo(Map<String, String> additionalInfo) {
this.additionalInfo = additionalInfo;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
index 8279417..640b91c 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
@@ -1,30 +1,29 @@
{
"uuid" : "901ed15e-7769-4c66-b7ae-fbdc971cd192",
-
"name" : "test_streaming_table_cube_desc",
"description" : "",
"dimensions" : [ {
- "name" : "DEFAULT.STREAMING_TABLE.SITE",
+ "name" : "SITE",
"table" : "DEFAULT.STREAMING_TABLE",
"column" : "SITE",
"derived" : null
}, {
- "name" : "DEFAULT.STREAMING_TABLE.ITM",
+ "name" : "ITM",
"table" : "DEFAULT.STREAMING_TABLE",
"column" : "ITM",
"derived" : null
}, {
- "name" : "TIME",
+ "name" : "DAY_START",
"table" : "DEFAULT.STREAMING_TABLE",
"column" : "DAY_START",
"derived" : null
}, {
- "name" : "TIME",
+ "name" : "HOUR_START",
"table" : "DEFAULT.STREAMING_TABLE",
"column" : "HOUR_START",
"derived" : null
}, {
- "name" : "TIME",
+ "name" : "MINUTE_START",
"table" : "DEFAULT.STREAMING_TABLE",
"column" : "MINUTE_START",
"derived" : null
@@ -68,13 +67,13 @@
} ],
"rowkey" : {
"rowkey_columns" : [ {
- "column" : "DAY_START",
+ "column" : "MINUTE_START",
"encoding" : "dict"
}, {
"column" : "HOUR_START",
"encoding" : "dict"
}, {
- "column" : "MINUTE_START",
+ "column" : "DAY_START",
"encoding" : "dict"
}, {
"column" : "SITE",
@@ -107,7 +106,7 @@
} ],
"override_kylin_properties": {
"kylin.cube.algorithm": "inmem",
- "kylin.cube.building.segment.max": "3"
+ "kylin.cube.building.segment.max": "5"
},
"notify_list" : [ ],
"status_need_notify" : [ ],
http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
index e6977e1..23b10f7 100644
--- a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
@@ -1,5 +1,4 @@
{
-
"uuid": "ff527b94-f860-44c3-8452-93b177888732",
"name": "test_streaming_table_model_desc",
"dimensions": [
@@ -23,7 +22,7 @@
"fact_table": "DEFAULT.STREAMING_TABLE",
"filter_condition": null,
"partition_desc": {
- "partition_date_column": "DEFAULT.STREAMING_TABLE.minute_start",
+ "partition_date_column": "DEFAULT.STREAMING_TABLE.MINUTE_START",
"partition_date_start": 0,
"partition_type": "APPEND"
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index dfcedfb..23d7ca8 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -20,12 +20,21 @@ package org.apache.kylin.provision;
import java.io.File;
import java.io.IOException;
+import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.TimeZone;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Lists;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.requests.MetadataResponse;
@@ -47,6 +56,7 @@ import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.job.streaming.Kafka10DataLoader;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -55,6 +65,8 @@ import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.lang.Thread.sleep;
+
/**
* for streaming cubing case "test_streaming_table"
*/
@@ -70,6 +82,7 @@ public class BuildCubeWithStream {
private KafkaConfig kafkaConfig;
private MockKafka kafkaServer;
protected static boolean fastBuildMode = false;
+ private boolean generateData = true;
public void before() throws Exception {
deployEnv();
@@ -139,44 +152,91 @@ public class BuildCubeWithStream {
public void build() throws Exception {
clearSegment(cubeName);
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
- f.setTimeZone(TimeZone.getTimeZone("GMT"));
- long date1 = 0;
- long date2 = f.parse("2013-01-01").getTime();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+ f.setTimeZone(TimeZone.getTimeZone("GMT"));
+ long dateStart = 0;
+ try {
+ dateStart = f.parse("2012-01-01").getTime();
+ } catch (ParseException e) {
+ }
+ Random rand = new Random();
+ while (generateData == true) {
+ long dateEnd = dateStart + 7 * 24 * 3600000;
+ try {
+ generateStreamData(dateStart, dateEnd, rand.nextInt(100));
+ dateStart = dateEnd;
+ sleep(rand.nextInt(rand.nextInt(100 * 1000))); // wait random time
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }).start();
+ ExecutorService executorService = Executors.newCachedThreadPool();
+
+ List<FutureTask<ExecutableState>> futures = Lists.newArrayList();
+ for (int i = 0; i < 5; i++) {
+ Thread.sleep(2 * 60 * 1000); // wait for new messages
+ FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() {
+ @Override
+ public ExecutableState call() {
+ ExecutableState result = null;
+ try {
+ result = buildSegment(cubeName, 0, Long.MAX_VALUE);
+ } catch (Exception e) {
+ // previous build hasn't been started, or other case.
+ e.printStackTrace();
+ }
+
+ return result;
+ }
+ });
- int numberOfRecrods1 = 10000;
- generateStreamData(date1, date2, numberOfRecrods1);
- ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE);
- Assert.assertTrue(result == ExecutableState.SUCCEED);
+ executorService.submit(futureTask);
+ futures.add(futureTask);
+ }
- if (fastBuildMode == false) {
- long date3 = f.parse("2013-04-01").getTime();
- int numberOfRecords2 = 5000;
- generateStreamData(date2, date3, numberOfRecords2);
- result = buildSegment(cubeName, 0, Long.MAX_VALUE);
- Assert.assertTrue(result == ExecutableState.SUCCEED);
+ generateData = false; // stop generating message to kafka
+ executorService.shutdown();
+ int succeedBuild = 0;
+ for (int i = 0; i < futures.size(); i++) {
+ ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES);
+ logger.info("Checking building task " + i + " whose state is " + result);
+ Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED );
+ if (result == ExecutableState.SUCCEED)
+ succeedBuild++;
+ }
+
+ logger.info(succeedBuild + " build jobs have been successfully completed.");
+ List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(SegmentStatusEnum.READY);
+ Assert.assertTrue(segments.size() == succeedBuild);
+
+ if (fastBuildMode == false) {
//empty build
- result = buildSegment(cubeName, 0, Long.MAX_VALUE);
+ ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE);
Assert.assertTrue(result == ExecutableState.DISCARDED);
+ long endOffset = segments.get(segments.size() - 1).getSourceOffsetEnd();
//merge
- result = mergeSegment(cubeName, 0, 15000);
+ result = mergeSegment(cubeName, 0, endOffset);
Assert.assertTrue(result == ExecutableState.SUCCEED);
- List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments();
+ segments = cubeManager.getCube(cubeName).getSegments();
Assert.assertTrue(segments.size() == 1);
CubeSegment toRefreshSeg = segments.get(0);
- HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo();
- refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap);
+ refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd());
segments = cubeManager.getCube(cubeName).getSegments();
Assert.assertTrue(segments.size() == 1);
}
-
}
+
private ExecutableState mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
@@ -185,14 +245,8 @@ public class BuildCubeWithStream {
return job.getStatus();
}
- private String refreshSegment(String cubeName, long startOffset, long endOffset, HashMap<String, String> partitionOffsetMap) throws Exception {
+ private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception {
CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
- segment.setAdditionalInfo(partitionOffsetMap);
- CubeInstance cubeInstance = cubeManager.getCube(cubeName);
- CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
- cubeBuilder.setToUpdateSegs(segment);
- cubeManager.updateCube(cubeBuilder);
- segment = cubeManager.getCube(cubeName).getSegmentById(segment.getUuid());
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
@@ -209,8 +263,8 @@ public class BuildCubeWithStream {
protected void deployEnv() throws IOException {
DeployUtil.overrideJobJarLocations();
-// DeployUtil.initCliWorkDir();
-// DeployUtil.deployMetadata();
+ // DeployUtil.initCliWorkDir();
+ // DeployUtil.deployMetadata();
}
public static void beforeClass() throws Exception {
@@ -265,13 +319,4 @@ public class BuildCubeWithStream {
}
}
-
- protected int cleanupOldStorage() throws Exception {
- String[] args = { "--delete", "true" };
-
- // KapStorageCleanupCLI cli = new KapStorageCleanupCLI();
- // cli.execute(args);
- return 0;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
deleted file mode 100644
index d8c857f..0000000
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.provision;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Random;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Lists;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.Thread.sleep;
-
-/**
- * for streaming cubing case "test_streaming_table", using multiple threads to build it concurrently.
- */
-public class BuildCubeWithStream2 extends BuildCubeWithStream {
-
- private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream2.class);
- private boolean generateData = true;
-
- @Override
- public void build() throws Exception {
- clearSegment(cubeName);
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
- f.setTimeZone(TimeZone.getTimeZone("GMT"));
- final long date1 = 0;
- final long date2 = f.parse("2013-01-01").getTime();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
-
- Random rand = new Random();
- while (generateData == true) {
- try {
- generateStreamData(date1, date2, rand.nextInt(100));
- sleep(rand.nextInt(rand.nextInt(100 * 1000))); // wait random time, from 0 to 100 seconds
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
- ExecutorService executorService = Executors.newFixedThreadPool(4);
-
- List<FutureTask<ExecutableState>> futures = Lists.newArrayList();
- for (int i = 0; i < 5; i++) {
- Thread.sleep(2 * 60 * 1000); // sleep 2 mintues
- FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() {
- @Override
- public ExecutableState call() {
- ExecutableState result = null;
- try {
- result = buildSegment(cubeName, 0, Long.MAX_VALUE);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- return result;
- }
- });
-
- executorService.submit(futureTask);
- futures.add(futureTask);
- }
-
- generateData = false; // stop generating message to kafka
- executorService.shutdown();
- int succeedBuild = 0;
- for (int i = 0; i < futures.size(); i++) {
- ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES);
- logger.info("Checking building task " + i + " whose state is " + result);
- Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED );
- if (result == ExecutableState.SUCCEED)
- succeedBuild++;
- }
-
- logger.info(succeedBuild + " build jobs have been successfully completed.");
- List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(SegmentStatusEnum.READY);
- Assert.assertTrue(segments.size() == succeedBuild);
-
- }
-
- public static void main(String[] args) throws Exception {
- try {
- beforeClass();
-
- BuildCubeWithStream2 buildCubeWithStream = new BuildCubeWithStream2();
- buildCubeWithStream.before();
- buildCubeWithStream.build();
- logger.info("Build is done");
- buildCubeWithStream.after();
- afterClass();
- logger.info("Going to exit");
- System.exit(0);
- } catch (Exception e) {
- logger.error("error", e);
- System.exit(1);
- }
-
- }
-
-}