You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/08/10 04:13:43 UTC
[2/6] kylin git commit: KYLIN-1726 Scalable streaming cubing
KYLIN-1726 Scalable streaming cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/acde3396
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/acde3396
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/acde3396
Branch: refs/heads/master
Commit: acde339623d43fa9b441614bc64ca7960e9255fe
Parents: a2b693c
Author: shaofengshi <sh...@apache.org>
Authored: Sun Jul 3 21:43:16 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:10:10 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/DeployUtil.java | 11 ++++----
.../kylin/job/streaming/KafkaDataLoader.java | 11 ++++++--
.../kylin/job/streaming/StreamDataLoader.java | 22 +++++++++++++++
.../java/org/apache/kylin/source/ISource.java | 4 +++
.../org/apache/kylin/source/SourceFactory.java | 5 ++++
.../kylin/engine/mr/BatchCubingJobBuilder.java | 1 +
.../kylin/engine/mr/JobBuilderSupport.java | 2 +-
.../engine/mr/common/AbstractHadoopJob.java | 28 ++++++++++++++++++++
.../apache/kylin/engine/mr/steps/CuboidJob.java | 1 +
.../engine/mr/steps/FactDistinctColumnsJob.java | 3 +++
.../kylin/engine/mr/steps/InMemCuboidJob.java | 1 +
.../test_streaming_table_cube_desc.json | 3 +++
.../kafka/DEFAULT.STREAMING_TABLE.json | 2 +-
.../kafka/default.streaming_table.json | 2 +-
.../streaming/DEFAULT.STREAMING_TABLE.json | 2 +-
.../streaming/default.streaming_table.json | 2 +-
.../table/DEFAULT.STREAMING_TABLE.json | 3 ++-
.../kylin/provision/BuildCubeWithStream.java | 3 ++-
.../apache/kylin/source/hive/HiveSource.java | 8 ++++++
19 files changed, 99 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 6128770..986edf6 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -36,7 +36,7 @@ import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.dataGen.FactTableGenerator;
-import org.apache.kylin.job.streaming.KafkaDataLoader;
+import org.apache.kylin.job.streaming.StreamDataLoader;
import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
@@ -45,7 +45,6 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.hive.HiveClient;
import org.apache.kylin.source.hive.HiveCmdBuilder;
import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.maven.model.Model;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
import org.slf4j.Logger;
@@ -148,15 +147,15 @@ public class DeployUtil {
deployHiveTables();
}
- public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, KafkaConfig kafkaConfig) throws IOException {
+ public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, StreamDataLoader streamDataLoader) throws IOException {
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable());
TableDesc tableDesc = cubeInstance.getFactTableDesc();
//load into kafka
- KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data);
- KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data2);
- logger.info("Write {} messages into topic {}", data.size(), kafkaConfig.getTopic());
+ streamDataLoader.loadIntoKafka(data);
+ streamDataLoader.loadIntoKafka(data2);
+ logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString());
//csv data for H2 use
List<TblColRef> tableColumns = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index 5242ff2..0eaae20 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -30,6 +30,7 @@ import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
@@ -38,9 +39,15 @@ import kafka.producer.ProducerConfig;
/**
* Load prepared data into kafka(for test use)
*/
-public class KafkaDataLoader {
+public class KafkaDataLoader extends StreamDataLoader {
+ List<KafkaClusterConfig> kafkaClusterConfigs;
- public static void loadIntoKafka(List<KafkaClusterConfig> kafkaClusterConfigs, List<String> messages) {
+ public KafkaDataLoader(KafkaConfig kafkaConfig) {
+ super(kafkaConfig);
+ this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs();
+ }
+
+ public void loadIntoKafka(List<String> messages) {
KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
new file mode 100644
index 0000000..50fc883
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
@@ -0,0 +1,22 @@
+package org.apache.kylin.job.streaming;
+
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+
+import java.util.List;
+
+/**
+ */
+public abstract class StreamDataLoader {
+ protected KafkaConfig kafkaConfig;
+ public StreamDataLoader(KafkaConfig kafkaConfig) {
+ this.kafkaConfig = kafkaConfig;
+ }
+
+ abstract public void loadIntoKafka(List<String> messages);
+
+ @Override
+ public String toString() {
+ return "kafka topic " + kafkaConfig.getTopic();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 3cd8a02..e9216f9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -20,9 +20,13 @@ package org.apache.kylin.source;
import org.apache.kylin.metadata.model.TableDesc;
+import java.util.List;
+
public interface ISource {
public <I> I adaptToBuildEngine(Class<I> engineInterface);
public ReadableTable createReadableTable(TableDesc tableDesc);
+
+ public List<String> getMRDependentResources(TableDesc table);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
index f701a0f..e82c6ed 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -18,6 +18,7 @@
package org.apache.kylin.source;
+import java.util.List;
import java.util.Map;
import org.apache.kylin.common.KylinConfig;
@@ -45,4 +46,8 @@ public class SourceFactory {
return tableSource(table).adaptToBuildEngine(engineInterface);
}
+ public static List<String> getMRDependentResources(TableDesc table) {
+ return tableSource(table).getMRDependentResources(table);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index ec9b1c6..5a098a8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -99,6 +99,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 3e9aff6..86451c9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -73,7 +73,7 @@ public class JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId));
appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
-
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
result.setMapReduceParams(cmd.toString());
return result;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 02928e0..04ecc71 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -29,6 +29,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
@@ -67,6 +68,7 @@ import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.SourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -164,6 +166,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
+ String kylinKafkaDependency = System.getProperty("kylin.kafka.dependency");
logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
Configuration jobConf = job.getConfiguration();
@@ -221,6 +224,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
+ // for hive dependencies
+ if (kylinKafkaDependency != null) {
+ kylinKafkaDependency = kylinKafkaDependency.replace(":", ",");
+
+ logger.info("Kafka Dependencies Before Filtered: " + kylinHiveDependency);
+
+ if (kylinDependency.length() > 0)
+ kylinDependency.append(",");
+ kylinDependency.append(kylinKafkaDependency);
+ } else {
+
+ logger.info("No Kafka dependency jars set in the environment, will find them from jvm:");
+
+ try {
+ String kafkaClientJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer"));
+ kylinDependency.append(kafkaClientJarPath).append(",");
+ logger.info("kafka jar file: " + kafkaClientJarPath);
+
+ } catch (ClassNotFoundException e) {
+ logger.error("Cannot found kafka dependency jars: " + e);
+ }
+ }
+
// for KylinJobMRLibDir
String mrLibDir = kylinConf.getKylinJobMRLibDir();
if (!StringUtils.isBlank(mrLibDir)) {
@@ -442,6 +468,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
TableDesc table = metaMgr.getTableDesc(tableName);
dumpList.add(table.getResourcePath());
+ List<String> dependentResources = SourceFactory.getMRDependentResources(table);
+ dumpList.addAll(dependentResources);
}
for (CubeSegment segment : cube.getSegments()) {
dumpList.addAll(segment.getDictionaryPaths());
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index f037d2e..f3524f8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -109,6 +109,7 @@ public class CuboidJob extends AbstractHadoopJob {
}
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId);
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 39aae72..f091ab9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -54,6 +54,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_CUBING_JOB_ID);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_SEGMENT_NAME);
options.addOption(OPTION_STATISTICS_ENABLED);
@@ -62,6 +63,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
parseOptions(options, args);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+ job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id);
String cubeName = getOptionValue(OPTION_CUBE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index f5076e4..510dbe8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -95,6 +95,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
}
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId);
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
index 0267db5..23e5b00 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
@@ -105,6 +105,9 @@
"joint_dims" : [ ]
}
} ],
+ "override_kylin_properties": {
+ "kylin.cube.algorithm": "random"
+ },
"notify_list" : [ ],
"status_need_notify" : [ ],
"auto_merge_time_ranges" : null,
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
index c97927d..6a64cce 100644
--- a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
@@ -1,7 +1,7 @@
{
"uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
- "name": "default.streaming_table",
+ "name": "DEFAULT.STREAMING_TABLE",
"topic": "test_streaming_table_topic_xyz",
"timeout": 60000,
"bufferSize": 65536,
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/kafka/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/default.streaming_table.json b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
index c97927d..6a64cce 100644
--- a/examples/test_case_data/localmeta/kafka/default.streaming_table.json
+++ b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
@@ -1,7 +1,7 @@
{
"uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
- "name": "default.streaming_table",
+ "name": "DEFAULT.STREAMING_TABLE",
"topic": "test_streaming_table_topic_xyz",
"timeout": 60000,
"bufferSize": 65536,
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
index 6eb4a88..85a477b 100644
--- a/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
@@ -1,6 +1,6 @@
{
"uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
- "name": "default.streaming_table",
+ "name": "DEFAULT.STREAMING_TABLE",
"type": "kafka",
"last_modified": 0
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/streaming/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/default.streaming_table.json b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
index 6eb4a88..85a477b 100644
--- a/examples/test_case_data/localmeta/streaming/default.streaming_table.json
+++ b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
@@ -1,6 +1,6 @@
{
"uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
- "name": "default.streaming_table",
+ "name": "DEFAULT.STREAMING_TABLE",
"type": "kafka",
"last_modified": 0
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
index 82f6fdb..5bcfa35 100644
--- a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
@@ -41,5 +41,6 @@
],
"database": "DEFAULT",
"source_type": 1,
- "last_modified": 0
+ "last_modified": 0,
+ "source_type" : 1
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 95f0f3d..9490560 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -33,6 +33,7 @@ import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.streaming.KafkaDataLoader;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -99,7 +100,7 @@ public class BuildCubeWithStream {
streamingConfig.setTopic(UUID.randomUUID().toString());
KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig);
- DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, streamingConfig);
+ DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, new KafkaDataLoader(streamingConfig));
}
public void cleanup() throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index b7dbff0..e9cebea 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -18,11 +18,14 @@
package org.apache.kylin.source.hive;
+import com.google.common.collect.Lists;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ReadableTable;
+import java.util.List;
+
//used by reflection
public class HiveSource implements ISource {
@@ -41,4 +44,9 @@ public class HiveSource implements ISource {
return new HiveTable(tableDesc);
}
+ @Override
+ public List<String> getMRDependentResources(TableDesc table) {
+ return Lists.newArrayList();
+ }
+
}