You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/09/20 03:43:49 UTC
[02/13] kylin git commit: Revert "KYLIN-1726 Scalable streaming
cubing"
Revert "KYLIN-1726 Scalable streaming cubing"
This reverts commit 81c7323b633df88eedac8b319fc57f9b62b01a4a.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/506cd783
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/506cd783
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/506cd783
Branch: refs/heads/master
Commit: 506cd783132023a06f1669ad248b74bf9d96d0e1
Parents: 1f48804
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Sep 19 23:55:54 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Tue Sep 20 11:43:08 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/DeployUtil.java | 35 +--
.../kylin/job/streaming/Kafka10DataLoader.java | 80 -------
.../apache/kylin/common/KylinConfigBase.java | 1 -
.../java/org/apache/kylin/cube/CubeSegment.java | 1 -
.../java/org/apache/kylin/cube/ISegment.java | 39 ++++
.../cube/gridtable/SegmentGTStartAndEnd.java | 2 +-
.../cube/model/CubeJoinedFlatTableDesc.java | 6 -
.../cube/model/CubeJoinedFlatTableEnrich.java | 6 -
.../apache/kylin/gridtable/ScannerWorker.java | 2 +-
.../metadata/model/IJoinedFlatTableDesc.java | 2 -
.../apache/kylin/metadata/model/ISegment.java | 36 ---
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 3 -
.../org/apache/kylin/engine/mr/IMRInput.java | 10 -
.../java/org/apache/kylin/engine/mr/MRUtil.java | 4 -
.../test_streaming_table_model_desc.json | 6 +-
.../kylin/provision/BuildCubeWithStream.java | 218 +++++-------------
.../org/apache/kylin/provision/MockKafka.java | 191 ----------------
.../apache/kylin/provision/NetworkUtils.java | 52 -----
pom.xml | 2 +-
.../apache/kylin/source/hive/HiveMRInput.java | 11 -
source-kafka/pom.xml | 13 +-
.../kylin/source/kafka/KafkaConfigManager.java | 46 ++--
.../apache/kylin/source/kafka/KafkaMRInput.java | 221 -------------------
.../apache/kylin/source/kafka/KafkaSource.java | 57 -----
.../kylin/source/kafka/KafkaStreamingInput.java | 65 +++---
.../kylin/source/kafka/MergeOffsetStep.java | 89 --------
.../kylin/source/kafka/SeekOffsetStep.java | 119 ----------
.../kylin/source/kafka/StreamingParser.java | 49 ++--
.../source/kafka/StringStreamingParser.java | 49 ++--
.../source/kafka/TimedJsonStreamParser.java | 49 ++--
.../apache/kylin/source/kafka/TopicMeta.java | 49 ++--
.../kylin/source/kafka/UpdateTimeRangeStep.java | 108 ---------
.../source/kafka/config/KafkaClusterConfig.java | 3 +-
.../source/kafka/hadoop/KafkaFlatTableJob.java | 165 --------------
.../kafka/hadoop/KafkaFlatTableMapper.java | 51 -----
.../source/kafka/hadoop/KafkaInputFormat.java | 98 --------
.../kafka/hadoop/KafkaInputRecordReader.java | 166 --------------
.../source/kafka/hadoop/KafkaInputSplit.java | 102 ---------
.../kylin/source/kafka/util/KafkaClient.java | 115 ----------
.../source/kafka/util/KafkaOffsetMapping.java | 97 --------
.../kylin/source/kafka/util/KafkaRequester.java | 56 +++--
.../kylin/source/kafka/util/KafkaUtils.java | 3 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 2 +-
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 2 +-
45 files changed, 348 insertions(+), 2135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 9b282e3..8c64f91 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -143,12 +143,14 @@ public class DeployUtil {
deployHiveTables();
}
- public static void prepareTestDataForStreamingCube(long startTime, long endTime, int numberOfRecords, String cubeName, StreamDataLoader streamDataLoader) throws IOException {
+ public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, StreamDataLoader streamDataLoader) throws IOException {
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
- List<String> data = StreamingTableDataGenerator.generate(numberOfRecords, startTime, endTime, cubeInstance.getFactTable());
+ List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
+ List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable());
TableDesc tableDesc = cubeInstance.getFactTableDesc();
//load into kafka
streamDataLoader.loadIntoKafka(data);
+ streamDataLoader.loadIntoKafka(data2);
logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString());
//csv data for H2 use
@@ -163,7 +165,7 @@ public class DeployUtil {
sb.append(StringUtils.join(rowColumns, ","));
sb.append(System.getProperty("line.separator"));
}
- appendFactTableData(sb.toString(), cubeInstance.getFactTable());
+ overrideFactTableData(sb.toString(), cubeInstance.getFactTable());
}
public static void overrideFactTableData(String factTableContent, String factTableName) throws IOException {
@@ -177,33 +179,6 @@ public class DeployUtil {
in.close();
}
- public static void appendFactTableData(String factTableContent, String factTableName) throws IOException {
- // Write to resource store
- ResourceStore store = ResourceStore.getStore(config());
-
- InputStream in = new ByteArrayInputStream(factTableContent.getBytes("UTF-8"));
- String factTablePath = "/data/" + factTableName + ".csv";
-
- File tmpFile = File.createTempFile(factTableName, "csv");
- FileOutputStream out = new FileOutputStream(tmpFile);
-
- try {
- if (store.exists(factTablePath)) {
- InputStream oldContent = store.getResource(factTablePath).inputStream;
- IOUtils.copy(oldContent, out);
- }
- IOUtils.copy(in, out);
- IOUtils.closeQuietly(in);
-
- store.deleteResource(factTablePath);
- in = new FileInputStream(tmpFile);
- store.putResource(factTablePath, in, System.currentTimeMillis());
- } finally {
- IOUtils.closeQuietly(out);
- IOUtils.closeQuietly(in);
- }
-
- }
private static void deployHiveTables() throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
deleted file mode 100644
index a5132af..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.streaming;
-
-import java.util.List;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-
-import org.apache.kylin.source.kafka.util.KafkaClient;
-
-/**
- * Load prepared data into kafka(for test use)
- */
-public class Kafka10DataLoader extends StreamDataLoader {
- private static final Logger logger = LoggerFactory.getLogger(Kafka10DataLoader.class);
- List<KafkaClusterConfig> kafkaClusterConfigs;
-
- public Kafka10DataLoader(KafkaConfig kafkaConfig) {
- super(kafkaConfig);
- this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs();
- }
-
- public void loadIntoKafka(List<String> messages) {
-
- KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
- String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
- @Nullable
- @Override
- public String apply(BrokerConfig brokerConfig) {
- return brokerConfig.getHost() + ":" + brokerConfig.getPort();
- }
- }), ",");
-
- Properties props = new Properties();
- props.put("acks", "1");
- props.put("retry.backoff.ms", "1000");
- KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props);
-
- int boundary = messages.size() / 10;
- for (int i = 0; i < messages.size(); ++i) {
- ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
- producer.send(keyedMessage);
- if (i % boundary == 0) {
- logger.info("sending " + i + " messages to " + this.toString());
- }
- }
- logger.info("sent " + messages.size() + " messages to " + this.toString());
- producer.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index fafb1fc..79ee084 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -717,7 +717,6 @@ abstract public class KylinConfigBase implements Serializable {
Map<Integer, String> r = convertKeyToInteger(getPropertiesByPrefix("kylin.source.engine."));
// ref constants in ISourceAware
r.put(0, "org.apache.kylin.source.hive.HiveSource");
- r.put(1, "org.apache.kylin.source.kafka.KafkaSource");
return r;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index afb0d28..79397c3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -37,7 +37,6 @@ import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealization;
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java b/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java
new file mode 100644
index 0000000..2e1f214
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+public interface ISegment {
+
+ public String getName();
+
+ public long getDateRangeStart();
+
+ public long getDateRangeEnd();
+
+ public long getSourceOffsetStart();
+
+ public long getSourceOffsetEnd();
+
+ public DataModelDesc getModel();
+
+ public SegmentStatusEnum getStatus();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
index 889a0b2..e31111d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.cube.ISegment;
import org.apache.kylin.dimension.AbstractDateDimEnc;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.metadata.datatype.DataType;
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 6ca89c8..6aeb617 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -26,7 +26,6 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -163,9 +162,4 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
return cubeDesc.getDistributedByColumn();
}
- @Override
- public ISegment getSegment() {
- return cubeSegment;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 8af2297..5212859 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -25,7 +25,6 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -138,9 +137,4 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc {
return flatDesc.getDistributedBy();
}
- @Override
- public ISegment getSegment() {
- return flatDesc.getSegment();
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
index 4213cf3..bb7503a 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
@@ -24,7 +24,7 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Iterator;
-import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.cube.ISegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index ffa2680..f3a4107 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -37,6 +37,4 @@ public interface IJoinedFlatTableDesc {
long getSourceOffsetEnd();
TblColRef getDistributedBy();
-
- ISegment getSegment();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
deleted file mode 100644
index f69ae3f..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.model;
-
-public interface ISegment {
-
- public String getName();
-
- public long getDateRangeStart();
-
- public long getDateRangeEnd();
-
- public long getSourceOffsetStart();
-
- public long getSourceOffsetEnd();
-
- public DataModelDesc getModel();
-
- public SegmentStatusEnum getStatus();
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index badf628..129d525 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -34,12 +34,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class);
private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
- private final IMRInput.IMRBatchMergeInputSide inputSide;
public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
- this.inputSide = MRUtil.getBatchMergeInputSide(seg);
}
public CubingJob build() {
@@ -57,7 +55,6 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
}
// Phase 1: Merge Dictionary
- inputSide.addStepPhase1_MergeDictionary(result);
result.addTask(createMergeDictionaryStep(mergingSegmentIds));
result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
outputSide.addStepPhase1_MergeDictionary(result);
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 62cede9..582052f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.TableDesc;
/**
@@ -35,9 +34,6 @@ public interface IMRInput {
/** Return an InputFormat that reads from specified table. */
public IMRTableInputFormat getTableInputFormat(TableDesc table);
- /** Return a helper to participate in batch cubing merge job flow. */
- public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
-
/**
* Utility that configures mapper to read from a table.
*/
@@ -71,10 +67,4 @@ public interface IMRInput {
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
- public interface IMRBatchMergeInputSide {
-
- /** Add step that executes before merge dictionary and before merge cube. */
- public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
-
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 67eef5e..2c3b77f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -71,10 +71,6 @@ public class MRUtil {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
}
- public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
- return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
- }
-
// use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
// Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
public static int runMRJob(Tool tool, String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
index e6977e1..cfb889a 100644
--- a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
@@ -4,7 +4,7 @@
"name": "test_streaming_table_model_desc",
"dimensions": [
{
- "table": "DEFAULT.STREAMING_TABLE",
+ "table": "default.streaming_table",
"columns": [
"minute_start",
"hour_start",
@@ -20,10 +20,10 @@
"item_count"
],
"last_modified": 0,
- "fact_table": "DEFAULT.STREAMING_TABLE",
+ "fact_table": "default.streaming_table",
"filter_condition": null,
"partition_desc": {
- "partition_date_column": "DEFAULT.STREAMING_TABLE.minute_start",
+ "partition_date_column": "default.streaming_table.minute_start",
"partition_date_start": 0,
"partition_type": "APPEND"
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 7f79acc..9490560 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,36 +20,24 @@ package org.apache.kylin.provision;
import java.io.File;
import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.TimeZone;
import java.util.UUID;
-import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.job.streaming.Kafka10DataLoader;
+import org.apache.kylin.job.streaming.KafkaDataLoader;
+import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
-import org.junit.Assert;
+import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,123 +46,31 @@ import org.slf4j.LoggerFactory;
*/
public class BuildCubeWithStream {
- private static final Logger logger = LoggerFactory.getLogger(org.apache.kylin.provision.BuildCubeWithStream.class);
-
- private CubeManager cubeManager;
- private DefaultScheduler scheduler;
- protected ExecutableManager jobService;
+ private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class);
private static final String cubeName = "test_streaming_table_cube";
+ private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00");
+ private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00");
+ private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours
- private KafkaConfig kafkaConfig;
- private MockKafka kafkaServer;
-
- public void before() throws Exception {
- deployEnv();
-
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- jobService = ExecutableManager.getInstance(kylinConfig);
- scheduler = DefaultScheduler.createInstance();
- scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
- if (!scheduler.hasStarted()) {
- throw new RuntimeException("scheduler has not been started");
- }
- cubeManager = CubeManager.getInstance(kylinConfig);
-
- final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
- final String factTable = cubeInstance.getFactTable();
-
- final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
- final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable);
- kafkaConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName());
-
- String topicName = UUID.randomUUID().toString();
- String localIp = NetworkUtils.getLocalIp();
- BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0);
- brokerConfig.setHost(localIp);
- kafkaConfig.setTopic(topicName);
- KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig);
-
- startEmbeddedKafka(topicName, brokerConfig);
- }
-
- private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
- //Start mock Kakfa
- String zkConnectionStr = "sandbox:2181";
- ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
- // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
- kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
- kafkaServer.start();
-
- kafkaServer.createTopic(topicName, 3, 1);
- kafkaServer.waitTopicUntilReady(topicName);
-
- MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName);
- Assert.assertEquals(topicName, topicMetadata.topic());
- }
-
- private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException {
- Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig);
- DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader);
- logger.info("Test data inserted into Kafka");
- }
+ private KylinConfig kylinConfig;
- private void clearSegment(String cubeName) throws Exception {
- CubeInstance cube = cubeManager.getCube(cubeName);
- // remove all existing segments
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
- cubeManager.updateCube(cubeBuilder);
- }
-
- public void build() throws Exception {
- clearSegment(cubeName);
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
- f.setTimeZone(TimeZone.getTimeZone("GMT"));
- long date1 = 0;
- long date2 = f.parse("2013-01-01").getTime();
-
- int numberOfRecrods1 = 10000;
- generateStreamData(date1, date2, numberOfRecrods1);
- buildSegment(cubeName, 0, Long.MAX_VALUE);
-
- long date3 = f.parse("2013-04-01").getTime();
- int numberOfRecrods2 = 5000;
- generateStreamData(date2, date3, numberOfRecrods2);
- buildSegment(cubeName, 0, Long.MAX_VALUE);
-
- //merge
- mergeSegment(cubeName, 0, 15000);
-
- }
-
- private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, true);
- DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
- jobService.addJob(job);
- waitForJob(job.getId());
- return job.getId();
- }
+ public static void main(String[] args) throws Exception {
- private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
- DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
- jobService.addJob(job);
- waitForJob(job.getId());
- return job.getId();
- }
+ try {
+ beforeClass();
- private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
- DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
- jobService.addJob(job);
- waitForJob(job.getId());
- return job.getId();
- }
+ BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
+ buildCubeWithStream.before();
+ buildCubeWithStream.build();
+ logger.info("Build is done");
+ buildCubeWithStream.cleanup();
+ logger.info("Going to exit");
+ System.exit(0);
+ } catch (Exception e) {
+ logger.error("error", e);
+ System.exit(1);
+ }
- protected void deployEnv() throws IOException {
- DeployUtil.overrideJobJarLocations();
- //DeployUtil.initCliWorkDir();
- //DeployUtil.deployMetadata();
}
public static void beforeClass() throws Exception {
@@ -187,54 +83,44 @@ public class BuildCubeWithStream {
HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
}
- public static void afterClass() throws Exception {
- HBaseMetadataTestCase.staticCleanupTestMetadata();
+ protected void deployEnv() throws IOException {
+ DeployUtil.overrideJobJarLocations();
}
- public void after() {
- kafkaServer.stop();
- }
+ public void before() throws Exception {
+ deployEnv();
- protected void waitForJob(String jobId) {
- while (true) {
- AbstractExecutable job = jobService.getJob(jobId);
- if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
- break;
- } else {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ final String factTable = cubeInstance.getFactTable();
+ final StreamingConfig config = StreamingManager.getInstance(kylinConfig).getStreamingConfig(factTable);
- public static void main(String[] args) throws Exception {
- try {
- beforeClass();
+ //Use a random topic for kafka data stream
+ KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(config.getName());
+ streamingConfig.setTopic(UUID.randomUUID().toString());
+ KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig);
- BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
- buildCubeWithStream.before();
- buildCubeWithStream.build();
- logger.info("Build is done");
- buildCubeWithStream.after();
- afterClass();
- logger.info("Going to exit");
- System.exit(0);
- } catch (Exception e) {
- logger.error("error", e);
- System.exit(1);
- }
+ DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, new KafkaDataLoader(streamingConfig));
+ }
+ public void cleanup() throws Exception {
+ cleanupOldStorage();
+ HBaseMetadataTestCase.staticCleanupTestMetadata();
}
protected int cleanupOldStorage() throws Exception {
String[] args = { "--delete", "true" };
- // KapStorageCleanupCLI cli = new KapStorageCleanupCLI();
- // cli.execute(args);
+ StorageCleanupJob cli = new StorageCleanupJob();
+ cli.execute(args);
return 0;
}
+ public void build() throws Exception {
+ logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval));
+ for (long start = startTime; start < endTime; start += batchInterval) {
+ logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval));
+ new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, start + batchInterval).build().run();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
deleted file mode 100644
index 3f47923..0000000
--- a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.provision;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.kafka.common.requests.MetadataResponse;
-
-import kafka.admin.AdminUtils;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import kafka.utils.ZkUtils;
-
-public class MockKafka {
- private static Properties createProperties(ZkConnection zkServerConnection, String logDir, String port, String brokerId) {
- Properties properties = new Properties();
- properties.put("port", port);
- properties.put("broker.id", brokerId);
- properties.put("log.dirs", logDir);
- properties.put("host.name", "localhost");
- properties.put("offsets.topic.replication.factor", "1");
- properties.put("delete.topic.enable", "true");
- properties.put("zookeeper.connect", zkServerConnection.getServers());
- String ip = NetworkUtils.getLocalIp();
- properties.put("listeners", "PLAINTEXT://" + ip + ":" + port);
- properties.put("advertised.listeners", "PLAINTEXT://" + ip + ":" + port);
- return properties;
- }
-
- private KafkaServerStartable kafkaServer;
-
- private ZkConnection zkConnection;
-
- public MockKafka(ZkConnection zkServerConnection) {
- this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), "9092", "1");
- start();
- }
-
- private MockKafka(Properties properties) {
- KafkaConfig kafkaConfig = new KafkaConfig(properties);
- kafkaServer = new KafkaServerStartable(kafkaConfig);
- }
-
- public MockKafka(ZkConnection zkServerConnection, int port, int brokerId) {
- this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), String.valueOf(port), String.valueOf(brokerId));
- start();
- }
-
- private MockKafka(ZkConnection zkServerConnection, String logDir, String port, String brokerId) {
- this(createProperties(zkServerConnection, logDir, port, brokerId));
- this.zkConnection = zkServerConnection;
- System.out.println(String.format("Kafka %s:%s dir:%s", kafkaServer.serverConfig().brokerId(), kafkaServer.serverConfig().port(), kafkaServer.serverConfig().logDirs()));
- }
-
- public void createTopic(String topic, int partition, int replication) {
- ZkClient zkClient = new ZkClient(zkConnection);
- ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
- zkClient.setZkSerializer(new ZKStringSerializer());
- AdminUtils.createTopic(zkUtils, topic, partition, replication, new Properties(), null);
- zkClient.close();
- }
-
- public void createTopic(String topic) {
- this.createTopic(topic, 1, 1);
- }
-
- public MetadataResponse.TopicMetadata fetchTopicMeta(String topic) {
- ZkClient zkClient = new ZkClient(zkConnection);
- ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
- zkClient.setZkSerializer(new ZKStringSerializer());
- MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
- zkClient.close();
- return topicMetadata;
- }
-
- /**
- * Delete may not work
- *
- * @param topic
- */
- public void deleteTopic(String topic) {
- ZkClient zkClient = new ZkClient(zkConnection);
- ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
- zkClient.setZkSerializer(new ZKStringSerializer());
- AdminUtils.deleteTopic(zkUtils, topic);
- zkClient.close();
- }
-
- public String getConnectionString() {
- return String.format("%s:%d", kafkaServer.serverConfig().hostName(), kafkaServer.serverConfig().port());
- }
-
- public void start() {
- kafkaServer.startup();
- System.out.println("embedded kafka is up");
- }
-
- public void stop() {
- kafkaServer.shutdown();
- System.out.println("embedded kafka down");
- }
-
- public MetadataResponse.TopicMetadata waitTopicUntilReady(String topic) {
- boolean isReady = false;
- MetadataResponse.TopicMetadata topicMeta = null;
- while (!isReady) {
- Random random = new Random();
- topicMeta = this.fetchTopicMeta(topic);
- List<MetadataResponse.PartitionMetadata> partitionsMetadata = topicMeta.partitionMetadata();
- Iterator<MetadataResponse.PartitionMetadata> iterator = partitionsMetadata.iterator();
- boolean hasGotLeader = true;
- boolean hasGotReplica = true;
- while (iterator.hasNext()) {
- MetadataResponse.PartitionMetadata partitionMeta = iterator.next();
- hasGotLeader &= (!partitionMeta.leader().isEmpty());
- if (partitionMeta.leader().isEmpty()) {
- System.out.println("Partition leader is not ready, wait 1s.");
- break;
- }
- hasGotReplica &= (!partitionMeta.replicas().isEmpty());
- if (partitionMeta.replicas().isEmpty()) {
- System.out.println("Partition replica is not ready, wait 1s.");
- break;
- }
- }
- isReady = hasGotLeader & hasGotReplica;
- if (!isReady) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- }
- }
- return topicMeta;
- }
-
- public String getZookeeperConnection() {
- return this.zkConnection.getServers();
- }
-}
-
-class ZKStringSerializer implements ZkSerializer {
-
- @Override
- public byte[] serialize(Object data) throws ZkMarshallingError {
- byte[] bytes = null;
- try {
- bytes = data.toString().getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new ZkMarshallingError(e);
- }
- return bytes;
- }
-
- @Override
- public Object deserialize(byte[] bytes) throws ZkMarshallingError {
- if (bytes == null)
- return null;
- else
- try {
- return new String(bytes, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new ZkMarshallingError(e);
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java b/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java
deleted file mode 100644
index 98f6d04..0000000
--- a/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.provision;
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Enumeration;
-
-public class NetworkUtils {
-
- public static String getLocalIp() {
- try {
- Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
- while (interfaces.hasMoreElements()) {
- NetworkInterface iface = interfaces.nextElement();
- if (iface.isLoopback() || !iface.isUp() || iface.isVirtual() || iface.isPointToPoint())
- continue;
- if (iface.getName().startsWith("vboxnet"))
- continue;
-
- Enumeration<InetAddress> addresses = iface.getInetAddresses();
- while (addresses.hasMoreElements()) {
- InetAddress addr = addresses.nextElement();
- final String ip = addr.getHostAddress();
- if (Inet4Address.class == addr.getClass())
- return ip;
- }
- }
- } catch (SocketException e) {
- throw new RuntimeException(e);
- }
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 30d3324..1abc4eb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
<!-- HBase versions -->
<hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
- <kafka.version>0.10.0.0</kafka.version>
+ <kafka.version>0.8.1</kafka.version>
<!-- Hadoop deps, keep compatible with hadoop2.version -->
<zookeeper.version>3.4.6</zookeeper.version>
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 09ac522..520d7cc 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -50,7 +50,6 @@ import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.LookupDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.slf4j.Logger;
@@ -70,16 +69,6 @@ public class HiveMRInput implements IMRInput {
return new HiveTableInputFormat(table.getIdentity());
}
- @Override
- public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
- return new IMRBatchMergeInputSide() {
- @Override
- public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
- // doing nothing
- }
- };
- }
-
public static class HiveTableInputFormat implements IMRTableInputFormat {
final String dbName;
final String tableName;
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 212f4c6..90c2211 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -32,11 +32,10 @@
</parent>
+ <properties>
+ </properties>
+
<dependencies>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-engine-mr</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
@@ -61,10 +60,16 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
+
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index cfdf316..d594873 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -1,19 +1,35 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
*/
package org.apache.kylin.source.kafka;
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
deleted file mode 100644
index cfce137..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-public class KafkaMRInput implements IMRInput {
-
- CubeSegment cubeSegment;
-
- @Override
- public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
- this.cubeSegment = (CubeSegment)flatDesc.getSegment();
- return new BatchCubingInputSide(cubeSegment);
- }
-
- @Override
- public IMRTableInputFormat getTableInputFormat(TableDesc table) {
- KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
- KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity());
- List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() {
- @Nullable
- @Override
- public TblColRef apply(ColumnDesc input) {
- return input.getRef();
- }
- });
-
- return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null);
- }
-
- @Override
- public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
- return new KafkaMRBatchMergeInputSide((CubeSegment) seg);
- }
-
- public static class KafkaTableInputFormat implements IMRTableInputFormat {
- private final CubeSegment cubeSegment;
- private List<TblColRef> columns;
- private StreamingParser streamingParser;
- private KafkaConfig kafkaConfig;
- private final JobEngineConfig conf;
-
- public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
- this.cubeSegment = cubeSegment;
- this.columns = columns;
- this.kafkaConfig = kafkaConfig;
- this.conf = conf;
- }
-
- @Override
- public void configureJob(Job job) {
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapOutputValueClass(Text.class);
- String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID);
- IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
- String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
- try {
- FileInputFormat.addInputPath(job, new Path(inputPath));
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public String[] parseMapperInput(Object mapperInput) {
- if (streamingParser == null) {
- try {
- streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
- } catch (ReflectiveOperationException e) {
- throw new IllegalArgumentException();
- }
- }
- Text text = (Text) mapperInput;
- ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()).slice();
- StreamingMessage streamingMessage = streamingParser.parse(buffer);
- return streamingMessage.getData().toArray(new String[streamingMessage.getData().size()]);
- }
-
- }
-
- public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
-
- final JobEngineConfig conf;
- final CubeSegment seg;
- private String outputPath;
-
- public BatchCubingInputSide(CubeSegment seg) {
- this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
- this.seg = seg;
- }
-
- @Override
- public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(createUpdateSegmentOffsetStep(jobFlow.getId()));
- jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId()));
- }
-
- public SeekOffsetStep createUpdateSegmentOffsetStep(String jobId) {
- final SeekOffsetStep result = new SeekOffsetStep();
- result.setName("Seek and update offset step");
-
- CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
- CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
- CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
-
- return result;
- }
-
- private MapReduceExecutable createSaveKafkaDataStep(String jobId) {
- MapReduceExecutable result = new MapReduceExecutable();
-
- IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg);
- outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
- result.setName("Save data from Kafka");
- result.setMapReduceJobClass(KafkaFlatTableJob.class);
- JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
- StringBuilder cmd = new StringBuilder();
- jobBuilderSupport.appendMapReduceParameters(cmd);
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
-
- result.setMapReduceParams(cmd.toString());
- return result;
- }
-
- @Override
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- final UpdateTimeRangeStep result = new UpdateTimeRangeStep();
- result.setName("Update Segment Time Range");
- CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
- CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
- CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
- JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "SYSTEM");
- result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, jobBuilderSupport.getFactDistinctColumnsPath(jobFlow.getId()));
- jobFlow.addTask(result);
-
- }
-
- @Override
- public IMRTableInputFormat getFlatTableInputFormat() {
- KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
- KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getRealization().getFactTable());
- List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns();
-
- return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf);
-
- }
-
- }
-
- class KafkaMRBatchMergeInputSide implements IMRBatchMergeInputSide {
-
- private CubeSegment cubeSegment;
-
- KafkaMRBatchMergeInputSide(CubeSegment cubeSegment) {
- this.cubeSegment = cubeSegment;
- }
-
- @Override
- public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
-
- final MergeOffsetStep result = new MergeOffsetStep();
- result.setName("Merge offset step");
-
- CubingExecutableUtil.setCubeName(cubeSegment.getRealization().getName(), result.getParams());
- CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams());
- CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
- jobFlow.addTask(result);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
deleted file mode 100644
index d039583..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import com.google.common.collect.Lists;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-
-import java.util.List;
-
-//used by reflection
-public class KafkaSource implements ISource {
-
- @SuppressWarnings("unchecked")
- @Override
- public <I> I adaptToBuildEngine(Class<I> engineInterface) {
- if (engineInterface == IMRInput.class) {
- return (I) new KafkaMRInput();
- } else {
- throw new RuntimeException("Cannot adapt to " + engineInterface);
- }
- }
-
- @Override
- public ReadableTable createReadableTable(TableDesc tableDesc) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<String> getMRDependentResources(TableDesc table) {
- List<String> dependentResources = Lists.newArrayList();
- dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity()));
- dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
- return dependentResources;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index de42689..c3bdb75 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -1,20 +1,36 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
package org.apache.kylin.source.kafka;
import java.util.List;
@@ -24,9 +40,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import com.google.common.base.Function;
-import kafka.cluster.BrokerEndPoint;
-import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StreamingBatch;
@@ -53,8 +66,6 @@ import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.message.MessageAndOffset;
-import javax.annotation.Nullable;
-
@SuppressWarnings("unused")
public class KafkaStreamingInput implements IStreamingInput {
@@ -140,16 +151,8 @@ public class KafkaStreamingInput implements IStreamingInput {
if (partitionMetadata.errorCode() != 0) {
logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
}
- replicaBrokers = Lists.transform(partitionMetadata.replicas(), new Function<BrokerEndPoint, Broker>() {
- @Nullable
- @Override
- public Broker apply(@Nullable BrokerEndPoint brokerEndPoint) {
- return new Broker(brokerEndPoint, SecurityProtocol.PLAINTEXT);
- }
- });
- BrokerEndPoint leaderEndpoint = partitionMetadata.leader();
-
- return new Broker(leaderEndpoint, SecurityProtocol.PLAINTEXT);
+ replicaBrokers = partitionMetadata.replicas();
+ return partitionMetadata.leader();
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
deleted file mode 100644
index a21b980..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
-
-/**
- */
-public class MergeOffsetStep extends AbstractExecutable {
-
- private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class);
- public MergeOffsetStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
- final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-
- List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
- Map<Integer, Long> mergedStartOffsets = Maps.newHashMap();
- Map<Integer, Long> mergedEndOffsets = Maps.newHashMap();
-
- long dateRangeStart = Long.MAX_VALUE, dateRangeEnd = 0;
- for (CubeSegment seg: mergingSegs) {
- Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(seg);
- Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(seg);
-
- for (Integer partition : startOffsets.keySet()) {
- long currentStart = mergedStartOffsets.get(partition) != null ? Long.valueOf(mergedStartOffsets.get(partition)) : Long.MAX_VALUE;
- long currentEnd = mergedEndOffsets.get(partition) != null ? Long.valueOf(mergedEndOffsets.get(partition)) : 0;
- mergedStartOffsets.put(partition, Math.min(currentStart, startOffsets.get(partition)));
- mergedEndOffsets.put(partition, Math.max(currentEnd, endOffsets.get(partition)));
- }
- dateRangeStart = Math.min(dateRangeStart, seg.getDateRangeStart());
- dateRangeEnd = Math.max(dateRangeEnd, seg.getDateRangeEnd());
- }
-
- KafkaOffsetMapping.saveOffsetStart(segment, mergedStartOffsets);
- KafkaOffsetMapping.saveOffsetEnd(segment, mergedEndOffsets);
- segment.setDateRangeStart(dateRangeStart);
- segment.setDateRangeEnd(dateRangeEnd);
-
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to update cube segment offset", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
deleted file mode 100644
index 5dca93f..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class SeekOffsetStep extends AbstractExecutable {
-
- private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class);
-
- public SeekOffsetStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
- final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-
- Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(segment);
- Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(segment);
-
- if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
- }
-
- final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable());
- final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
- final String topic = kafakaConfig.getTopic();
- try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
- final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
-
- if (startOffsets.isEmpty()) {
- // user didn't specify start offset, use the biggest offset in existing segments as start
- for (CubeSegment seg : cube.getSegments()) {
- Map<Integer, Long> segEndOffset = KafkaOffsetMapping.parseOffsetEnd(seg);
- for (PartitionInfo partition : partitionInfos) {
- int partitionId = partition.partition();
- if (segEndOffset.containsKey(partitionId)) {
- startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId)));
- }
- }
- }
-
- if (partitionInfos.size() > startOffsets.size()) {
- // has new partition added
- for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
- long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
- startOffsets.put(partitionInfos.get(x).partition(), earliest);
- }
- }
-
- logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
- }
-
- if (endOffsets.isEmpty()) {
- // user didn't specify end offset, use latest offset in kafka
- for (PartitionInfo partitionInfo : partitionInfos) {
- long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
- endOffsets.put(partitionInfo.partition(), latest);
- }
-
- logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString());
- }
- }
-
- KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
- KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
-
- segment.setName(CubeSegment.makeSegmentName(0, 0, segment.getSourceOffsetStart(), segment.getSourceOffsetEnd()));
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to update cube segment offset", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 6b7d658..cb6a72b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -1,20 +1,37 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
package org.apache.kylin.source.kafka;
import java.lang.reflect.Constructor;
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index 666297f..8888d67 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -1,20 +1,37 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
package org.apache.kylin.source.kafka;
import java.nio.ByteBuffer;