You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/10 05:38:01 UTC
[4/6] kylin git commit: KYLIN-2072 Cleanup old streaming code
KYLIN-2072 Cleanup old streaming code
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5aee0226
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5aee0226
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5aee0226
Branch: refs/heads/master
Commit: 5aee022612c6fa40c41e8c00063714b79b6d5237
Parents: cb2b12b
Author: shaofengshi <sh...@apache.org>
Authored: Sun Oct 9 13:10:50 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 10 13:32:44 2016 +0800
----------------------------------------------------------------------
assembly/pom.xml | 4 -
.../kylin/job/streaming/KafkaDataLoader.java | 79 ----
build/bin/cleanup_streaming_files.sh | 42 --
build/bin/kylin.sh | 61 ---
build/bin/streaming_build.sh | 33 --
build/bin/streaming_check.sh | 29 --
build/bin/streaming_fillgap.sh | 26 --
build/bin/streaming_rolllog.sh | 29 --
.../metadata/streaming/StreamingConfig.java | 85 ++++
.../metadata/streaming/StreamingManager.java | 248 ++++++++++++
.../.settings/org.eclipse.core.resources.prefs | 6 -
.../.settings/org.eclipse.jdt.core.prefs | 386 -------------------
.../.settings/org.eclipse.jdt.ui.prefs | 7 -
engine-streaming/pom.xml | 121 ------
.../kylin/engine/streaming/BootstrapConfig.java | 71 ----
.../kylin/engine/streaming/IStreamingInput.java | 30 --
.../engine/streaming/IStreamingOutput.java | 34 --
.../streaming/OneOffStreamingBuilder.java | 71 ----
.../engine/streaming/StreamingBatchBuilder.java | 43 ---
.../kylin/engine/streaming/StreamingConfig.java | 85 ----
.../engine/streaming/StreamingManager.java | 248 ------------
.../kylin/engine/streaming/cli/MonitorCLI.java | 88 -----
.../engine/streaming/cli/StreamingCLI.java | 114 ------
.../streaming/cube/StreamingCubeBuilder.java | 168 --------
.../diagnose/StreamingLogAnalyzer.java | 96 -----
.../streaming/monitor/StreamingMonitor.java | 172 ---------
.../engine/streaming/util/StreamingUtils.java | 51 ---
.../kylin/provision/BuildCubeWithStream.java | 4 +-
pom.xml | 6 -
.../rest/controller/StreamingController.java | 2 +-
.../kylin/rest/controller/TableController.java | 2 +-
.../apache/kylin/rest/service/BasicService.java | 2 +-
.../kylin/rest/service/StreamingService.java | 2 +-
source-kafka/pom.xml | 6 -
.../kafka/ByteBufferBackedInputStream.java | 52 ---
.../apache/kylin/source/kafka/KafkaSource.java | 2 +-
.../kylin/source/kafka/KafkaStreamingInput.java | 227 -----------
.../source/kafka/TimedJsonStreamParser.java | 1 +
.../kafka/diagnose/KafkaInputAnalyzer.java | 312 ---------------
.../source/kafka/diagnose/KafkaVerify.java | 101 -----
.../source/kafka/diagnose/TimeHistogram.java | 85 ----
.../kafka/util/ByteBufferBackedInputStream.java | 52 +++
.../kylin/source/kafka/util/KafkaRequester.java | 191 ---------
.../kylin/source/kafka/util/KafkaUtils.java | 173 ---------
.../test/java/TimedJsonStreamParserTest.java | 4 +-
storage-hbase/pom.xml | 4 -
.../hbase/steps/HBaseStreamingOutput.java | 98 -----
.../apache/kylin/tool/CubeMetaExtractor.java | 4 +-
48 files changed, 397 insertions(+), 3360 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 0c80afc..e6f83a8 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -47,10 +47,6 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-engine-mr</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-engine-streaming</artifactId>
- </dependency>
<!-- Env & Test -->
<dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
deleted file mode 100644
index 454f6cf..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.streaming;
-
-import java.util.List;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-/**
- * Load prepared data into kafka(for test use)
- */
-public class KafkaDataLoader extends StreamDataLoader {
- List<KafkaClusterConfig> kafkaClusterConfigs;
-
- public KafkaDataLoader(KafkaConfig kafkaConfig) {
- super(kafkaConfig);
- this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs();
- }
-
- public void loadIntoKafka(List<String> messages) {
-
- KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
- String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
- @Nullable
- @Override
- public String apply(BrokerConfig brokerConfig) {
- return brokerConfig.getHost() + ":" + brokerConfig.getPort();
- }
- }), ",");
- Properties props = new Properties();
- props.put("metadata.broker.list", brokerList);
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("request.required.acks", "1");
- props.put("retry.backoff.ms", "1000");
-
- ProducerConfig config = new ProducerConfig(props);
-
- Producer<String, String> producer = new Producer<String, String>(config);
-
- List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
- for (int i = 0; i < messages.size(); ++i) {
- KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
- keyedMessages.add(keyedMessage);
- }
- producer.send(keyedMessages);
- producer.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/cleanup_streaming_files.sh
----------------------------------------------------------------------
diff --git a/build/bin/cleanup_streaming_files.sh b/build/bin/cleanup_streaming_files.sh
deleted file mode 100644
index 9b31a4f..0000000
--- a/build/bin/cleanup_streaming_files.sh
+++ /dev/null
@@ -1,42 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-if [ $# != 1 ]
-then
- echo 'invalid input'
- exit -1
-fi
-
-cd $KYLIN_HOME/logs
-
-for pidfile in `find -L . -name "$1_1*"`
-do
- pidfile=`echo "$pidfile" | cut -c 3-`
- echo "pidfile:$pidfile"
- pid=`cat $pidfile`
- if [ `ps -ef | awk '{print $2}' | grep -w $pid | wc -l` = 1 ]
- then
- echo "pid:$pid still running"
- else
- echo "pid:$pid not running, try to delete files"
- echo $pidfile | xargs rm
- echo "streaming_$pidfile.log" | xargs rm
- fi
-done
-
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index e767492..039be9f 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -139,67 +139,6 @@ then
exit 1
fi
-# streaming command
-elif [ "$1" == "streaming" ]
-then
- if [ $# -lt 4 ]
- then
- echo "invalid input args $@"
- exit -1
- fi
- if [ "$2" == "start" ]
- then
- retrieveDependency
- source ${dir}/find-kafka-dependency.sh
-
- # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
- hbase ${KYLIN_EXTRA_START_OPTS} \
- -Dlog4j.configuration=kylin-log4j.properties\
- -Dkylin.hive.dependency=${hive_dependency} \
- -Dkylin.kafka.dependency=${kafka_dependency} \
- -Dkylin.hbase.dependency=${hbase_dependency} \
- org.apache.kylin.engine.streaming.cli.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
- echo "streaming started name: $3 id: $4"
- exit 0
- elif [ "$2" == "stop" ]
- then
- if [ ! -f "${KYLIN_HOME}/$3_$4" ]
- then
- echo "streaming is not running, please check"
- exit 1
- fi
- pid=`cat ${KYLIN_HOME}/$3_$4`
- if [ "$pid" = "" ]
- then
- echo "streaming is not running, please check"
- exit 1
- else
- echo "stopping streaming:$pid"
- kill $pid
- fi
- rm ${KYLIN_HOME}/$3_$4
- exit 0
- else
- echo
- fi
-
-# monitor command
-elif [ "$1" == "monitor" ]
-then
- echo "monitor job"
-
- retrieveDependency
- source ${dir}/find-kafka-dependency.sh
-
- # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
- hbase ${KYLIN_EXTRA_START_OPTS} \
- -Dlog4j.configuration=kylin-log4j.properties\
- -Dkylin.hive.dependency=${hive_dependency} \
- -Dkylin.kafka.dependency=${kafka_dependency} \
- -Dkylin.hbase.dependency=${hbase_dependency} \
- org.apache.kylin.engine.streaming.cli.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
- exit 0
-
elif [ "$1" = "version" ]
then
exec hbase -Dlog4j.configuration=kylin-log4j.properties org.apache.kylin.common.KylinVersion
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_build.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh
deleted file mode 100644
index ed19036..0000000
--- a/build/bin/streaming_build.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-CUBE=$1
-INTERVAL=$2
-DELAY=$3
-CURRENT_TIME_IN_SECOND=`date +%s`
-CURRENT_TIME=$((CURRENT_TIME_IN_SECOND * 1000))
-START=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY))
-END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL))
-
-ID="$START"_"$END"
-echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${CUBE} ${ID} -start ${START} -end ${END} -cube ${CUBE}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_check.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_check.sh b/build/bin/streaming_check.sh
deleted file mode 100644
index fef0139..0000000
--- a/build/bin/streaming_check.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-receivers=$1
-host=$2
-tablename=$3
-authorization=$4
-projectname=$5
-cubename=$6
-sh ${KYLIN_HOME}/bin/kylin.sh monitor -receivers ${receivers} -host ${host} -tableName ${tablename} -authorization ${authorization} -cubeName ${cubename} -projectName ${projectname}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_fillgap.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh
deleted file mode 100644
index c67809a..0000000
--- a/build/bin/streaming_fillgap.sh
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-cube=$1
-
-cd ${KYLIN_HOME}
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${cube} fillgap -cube ${cube} -fillGap true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_rolllog.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_rolllog.sh b/build/bin/streaming_rolllog.sh
deleted file mode 100644
index 8018eb8..0000000
--- a/build/bin/streaming_rolllog.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-KYLIN_LOG_HOME=${KYLIN_HOME}/logs
-cd ${KYLIN_LOG_HOME}
-timestamp=`date +%Y_%m_%d_%H_%M_%S`
-tarfile=logs_archived_at_${timestamp}.tar
-files=`find -L . ! -name '*.tar' -type f -mtime +1` # keep two days' log
-echo ${files} | xargs tar -cvf ${tarfile}
-echo ${files} | xargs rm
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
new file mode 100644
index 0000000..9fd6ede
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.streaming;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class StreamingConfig extends RootPersistentEntity {
+
+ public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+ public static final String STREAMING_TYPE_KAFKA = "kafka";
+
+ @JsonProperty("name")
+ private String name;
+
+ @JsonProperty("type")
+ private String type = STREAMING_TYPE_KAFKA;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getResourcePath() {
+ return concatResourcePath(name);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public static String concatResourcePath(String name) {
+ return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
+ }
+
+ @Override
+ public StreamingConfig clone() {
+ try {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ SERIALIZER.serialize(this, new DataOutputStream(baos));
+ return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
+ } catch (IOException e) {
+ throw new RuntimeException(e);//in mem, should not happen
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
new file mode 100644
index 0000000..8cfe87d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class StreamingManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class);
+
+ // static cached instances
+ private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
+
+ public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+ private KylinConfig config;
+
+ // name ==> StreamingConfig
+ private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
+
+ public static void clearCache() {
+ CACHE.clear();
+ }
+
+ private StreamingManager(KylinConfig config) throws IOException {
+ this.config = config;
+ this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming");
+
+ // touch lower level metadata before registering my listener
+ reloadAllStreaming();
+ Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming");
+ }
+
+ private class StreamingSyncListener extends Broadcaster.Listener {
+ @Override
+ public void onClearAll(Broadcaster broadcaster) throws IOException {
+ clearCache();
+ }
+
+ @Override
+ public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+ if (event == Event.DROP)
+ removeStreamingLocal(cacheKey);
+ else
+ reloadStreamingConfigLocal(cacheKey);
+ }
+ }
+
+ private ResourceStore getStore() {
+ return ResourceStore.getStore(this.config);
+ }
+
+ public static StreamingManager getInstance(KylinConfig config) {
+ StreamingManager r = CACHE.get(config);
+ if (r != null) {
+ return r;
+ }
+
+ synchronized (StreamingManager.class) {
+ r = CACHE.get(config);
+ if (r != null) {
+ return r;
+ }
+ try {
+ r = new StreamingManager(config);
+ CACHE.put(config, r);
+ if (CACHE.size() > 1) {
+ logger.warn("More than one singleton exist");
+ }
+ return r;
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
+ }
+ }
+ }
+
+ private static String formatStreamingConfigPath(String name) {
+ return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
+ }
+
+ private static String formatStreamingOutputPath(String streaming, int partition) {
+ return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
+ }
+
+ private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
+ return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
+ }
+
+ public StreamingConfig getStreamingConfig(String name) {
+ return streamingMap.get(name);
+ }
+
+ public List<StreamingConfig> listAllStreaming() {
+ return new ArrayList<>(streamingMap.values());
+ }
+
+ /**
+ * Reload StreamingConfig from resource store It will be triggered by an desc
+ * update event.
+ *
+ * @param name
+ * @throws IOException
+ */
+ public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException {
+
+ // Save Source
+ String path = StreamingConfig.concatResourcePath(name);
+
+ // Reload the StreamingConfig
+ StreamingConfig ndesc = loadStreamingConfigAt(path);
+
+ // Here replace the old one
+ streamingMap.putLocal(ndesc.getName(), ndesc);
+ return ndesc;
+ }
+
+ // remove streamingConfig
+ public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+ String path = streamingConfig.getResourcePath();
+ getStore().deleteResource(path);
+ streamingMap.remove(streamingConfig.getName());
+ }
+
+ public StreamingConfig getConfig(String name) {
+ name = name.toUpperCase();
+ return streamingMap.get(name);
+ }
+
+ public void removeStreamingLocal(String streamingName) {
+ streamingMap.removeLocal(streamingName);
+ }
+
+ /**
+ * Update CubeDesc with the input. Broadcast the event into cluster
+ *
+ * @param desc
+ * @return
+ * @throws IOException
+ */
+ public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException {
+ // Validate CubeDesc
+ if (desc.getUuid() == null || desc.getName() == null) {
+ throw new IllegalArgumentException("SteamingConfig Illegal.");
+ }
+ String name = desc.getName();
+ if (!streamingMap.containsKey(name)) {
+ throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
+ }
+
+ // Save Source
+ String path = desc.getResourcePath();
+ getStore().putResource(path, desc, STREAMING_SERIALIZER);
+
+ // Reload the StreamingConfig
+ StreamingConfig ndesc = loadStreamingConfigAt(path);
+ // Here replace the old one
+ streamingMap.put(ndesc.getName(), desc);
+
+ return ndesc;
+ }
+
+ public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+ if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
+ throw new IllegalArgumentException();
+ }
+
+ if (streamingMap.containsKey(streamingConfig.getName()))
+ throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
+
+ String path = StreamingConfig.concatResourcePath(streamingConfig.getName());
+ getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
+ streamingMap.put(streamingConfig.getName(), streamingConfig);
+ return streamingConfig;
+ }
+
+ private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
+ ResourceStore store = getStore();
+ StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
+
+ if (StringUtils.isBlank(streamingDesc.getName())) {
+ throw new IllegalStateException("StreamingConfig name must not be blank");
+ }
+ return streamingDesc;
+ }
+
+ private void reloadAllStreaming() throws IOException {
+ ResourceStore store = getStore();
+ logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
+
+ streamingMap.clear();
+
+ List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+ for (String path : paths) {
+ StreamingConfig streamingConfig;
+ try {
+ streamingConfig = loadStreamingConfigAt(path);
+ } catch (Exception e) {
+ logger.error("Error loading streaming desc " + path, e);
+ continue;
+ }
+ if (path.equals(streamingConfig.getResourcePath()) == false) {
+ logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath());
+ continue;
+ }
+ if (streamingMap.containsKey(streamingConfig.getName())) {
+ logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path);
+ continue;
+ }
+
+ streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
+ }
+
+ logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.core.resources.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.core.resources.prefs b/engine-streaming/.settings/org.eclipse.core.resources.prefs
deleted file mode 100644
index 29abf99..0000000
--- a/engine-streaming/.settings/org.eclipse.core.resources.prefs
+++ /dev/null
@@ -1,6 +0,0 @@
-eclipse.preferences.version=1
-encoding//src/main/java=UTF-8
-encoding//src/main/resources=UTF-8
-encoding//src/test/java=UTF-8
-encoding//src/test/resources=UTF-8
-encoding/<project>=UTF-8
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.jdt.core.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.jdt.core.prefs b/engine-streaming/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index 5aaaf1e..0000000
--- a/engine-streaming/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,386 +0,0 @@
-eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.annotation.inheritNullAnnotations=disabled
-org.eclipse.jdt.core.compiler.annotation.missingNonNullByDefaultAnnotation=ignore
-org.eclipse.jdt.core.compiler.annotation.nonnull=org.eclipse.jdt.annotation.NonNull
-org.eclipse.jdt.core.compiler.annotation.nonnull.secondary=
-org.eclipse.jdt.core.compiler.annotation.nonnullbydefault=org.eclipse.jdt.annotation.NonNullByDefault
-org.eclipse.jdt.core.compiler.annotation.nonnullbydefault.secondary=
-org.eclipse.jdt.core.compiler.annotation.nullable=org.eclipse.jdt.annotation.Nullable
-org.eclipse.jdt.core.compiler.annotation.nullable.secondary=
-org.eclipse.jdt.core.compiler.annotation.nullanalysis=disabled
-org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
-org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
-org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
-org.eclipse.jdt.core.compiler.compliance=1.7
-org.eclipse.jdt.core.compiler.debug.lineNumber=generate
-org.eclipse.jdt.core.compiler.debug.localVariable=generate
-org.eclipse.jdt.core.compiler.debug.sourceFile=generate
-org.eclipse.jdt.core.compiler.problem.annotationSuperInterface=warning
-org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
-org.eclipse.jdt.core.compiler.problem.autoboxing=ignore
-org.eclipse.jdt.core.compiler.problem.comparingIdentical=warning
-org.eclipse.jdt.core.compiler.problem.deadCode=warning
-org.eclipse.jdt.core.compiler.problem.deprecation=warning
-org.eclipse.jdt.core.compiler.problem.deprecationInDeprecatedCode=disabled
-org.eclipse.jdt.core.compiler.problem.deprecationWhenOverridingDeprecatedMethod=disabled
-org.eclipse.jdt.core.compiler.problem.discouragedReference=ignore
-org.eclipse.jdt.core.compiler.problem.emptyStatement=ignore
-org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
-org.eclipse.jdt.core.compiler.problem.explicitlyClosedAutoCloseable=ignore
-org.eclipse.jdt.core.compiler.problem.fallthroughCase=ignore
-org.eclipse.jdt.core.compiler.problem.fatalOptionalError=disabled
-org.eclipse.jdt.core.compiler.problem.fieldHiding=ignore
-org.eclipse.jdt.core.compiler.problem.finalParameterBound=warning
-org.eclipse.jdt.core.compiler.problem.finallyBlockNotCompletingNormally=warning
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=ignore
-org.eclipse.jdt.core.compiler.problem.hiddenCatchBlock=warning
-org.eclipse.jdt.core.compiler.problem.includeNullInfoFromAsserts=disabled
-org.eclipse.jdt.core.compiler.problem.incompatibleNonInheritedInterfaceMethod=warning
-org.eclipse.jdt.core.compiler.problem.incompleteEnumSwitch=warning
-org.eclipse.jdt.core.compiler.problem.indirectStaticAccess=ignore
-org.eclipse.jdt.core.compiler.problem.localVariableHiding=ignore
-org.eclipse.jdt.core.compiler.problem.methodWithConstructorName=warning
-org.eclipse.jdt.core.compiler.problem.missingDefaultCase=ignore
-org.eclipse.jdt.core.compiler.problem.missingDeprecatedAnnotation=ignore
-org.eclipse.jdt.core.compiler.problem.missingEnumCaseDespiteDefault=disabled
-org.eclipse.jdt.core.compiler.problem.missingHashCodeMethod=ignore
-org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotation=ignore
-org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotationForInterfaceMethodImplementation=enabled
-org.eclipse.jdt.core.compiler.problem.missingSerialVersion=warning
-org.eclipse.jdt.core.compiler.problem.missingSynchronizedOnInheritedMethod=ignore
-org.eclipse.jdt.core.compiler.problem.noEffectAssignment=warning
-org.eclipse.jdt.core.compiler.problem.noImplicitStringConversion=warning
-org.eclipse.jdt.core.compiler.problem.nonExternalizedStringLiteral=ignore
-org.eclipse.jdt.core.compiler.problem.nonnullParameterAnnotationDropped=warning
-org.eclipse.jdt.core.compiler.problem.nonnullTypeVariableFromLegacyInvocation=warning
-org.eclipse.jdt.core.compiler.problem.nullAnnotationInferenceConflict=error
-org.eclipse.jdt.core.compiler.problem.nullReference=warning
-org.eclipse.jdt.core.compiler.problem.nullSpecViolation=error
-org.eclipse.jdt.core.compiler.problem.nullUncheckedConversion=warning
-org.eclipse.jdt.core.compiler.problem.overridingPackageDefaultMethod=warning
-org.eclipse.jdt.core.compiler.problem.parameterAssignment=ignore
-org.eclipse.jdt.core.compiler.problem.pessimisticNullAnalysisForFreeTypeVariables=warning
-org.eclipse.jdt.core.compiler.problem.possibleAccidentalBooleanAssignment=ignore
-org.eclipse.jdt.core.compiler.problem.potentialNullReference=ignore
-org.eclipse.jdt.core.compiler.problem.potentiallyUnclosedCloseable=ignore
-org.eclipse.jdt.core.compiler.problem.rawTypeReference=ignore
-org.eclipse.jdt.core.compiler.problem.redundantNullAnnotation=warning
-org.eclipse.jdt.core.compiler.problem.redundantNullCheck=ignore
-org.eclipse.jdt.core.compiler.problem.redundantSpecificationOfTypeArguments=ignore
-org.eclipse.jdt.core.compiler.problem.redundantSuperinterface=ignore
-org.eclipse.jdt.core.compiler.problem.reportMethodCanBePotentiallyStatic=ignore
-org.eclipse.jdt.core.compiler.problem.reportMethodCanBeStatic=ignore
-org.eclipse.jdt.core.compiler.problem.specialParameterHidingField=disabled
-org.eclipse.jdt.core.compiler.problem.staticAccessReceiver=warning
-org.eclipse.jdt.core.compiler.problem.suppressOptionalErrors=disabled
-org.eclipse.jdt.core.compiler.problem.suppressWarnings=enabled
-org.eclipse.jdt.core.compiler.problem.syntacticNullAnalysisForFields=disabled
-org.eclipse.jdt.core.compiler.problem.syntheticAccessEmulation=ignore
-org.eclipse.jdt.core.compiler.problem.typeParameterHiding=warning
-org.eclipse.jdt.core.compiler.problem.unavoidableGenericTypeProblems=enabled
-org.eclipse.jdt.core.compiler.problem.uncheckedTypeOperation=ignore
-org.eclipse.jdt.core.compiler.problem.unclosedCloseable=warning
-org.eclipse.jdt.core.compiler.problem.undocumentedEmptyBlock=ignore
-org.eclipse.jdt.core.compiler.problem.unhandledWarningToken=warning
-org.eclipse.jdt.core.compiler.problem.unnecessaryElse=ignore
-org.eclipse.jdt.core.compiler.problem.unnecessaryTypeCheck=ignore
-org.eclipse.jdt.core.compiler.problem.unqualifiedFieldAccess=ignore
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownException=ignore
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionExemptExceptionAndThrowable=enabled
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionIncludeDocCommentReference=enabled
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionWhenOverriding=disabled
-org.eclipse.jdt.core.compiler.problem.unusedExceptionParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedImport=warning
-org.eclipse.jdt.core.compiler.problem.unusedLabel=warning
-org.eclipse.jdt.core.compiler.problem.unusedLocal=warning
-org.eclipse.jdt.core.compiler.problem.unusedObjectAllocation=ignore
-org.eclipse.jdt.core.compiler.problem.unusedParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedParameterIncludeDocCommentReference=enabled
-org.eclipse.jdt.core.compiler.problem.unusedParameterWhenImplementingAbstract=disabled
-org.eclipse.jdt.core.compiler.problem.unusedParameterWhenOverridingConcrete=disabled
-org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=warning
-org.eclipse.jdt.core.compiler.problem.unusedTypeParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedWarningToken=warning
-org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=warning
-org.eclipse.jdt.core.compiler.source=1.7
-org.eclipse.jdt.core.formatter.align_type_members_on_columns=false
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation=0
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_assignment=0
-org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
-org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
-org.eclipse.jdt.core.formatter.alignment_for_enum_constants=0
-org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16
-org.eclipse.jdt.core.formatter.alignment_for_method_declaration=0
-org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_resources_in_try=80
-org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_union_type_in_multicatch=16
-org.eclipse.jdt.core.formatter.blank_lines_after_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_after_package=1
-org.eclipse.jdt.core.formatter.blank_lines_before_field=0
-org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration=0
-org.eclipse.jdt.core.formatter.blank_lines_before_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_before_member_type=1
-org.eclipse.jdt.core.formatter.blank_lines_before_method=1
-org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk=1
-org.eclipse.jdt.core.formatter.blank_lines_before_package=0
-org.eclipse.jdt.core.formatter.blank_lines_between_import_groups=1
-org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations=1
-org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_array_initializer=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block_in_case=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_constant=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_method_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false
-org.eclipse.jdt.core.formatter.comment.format_block_comments=false
-org.eclipse.jdt.core.formatter.comment.format_header=false
-org.eclipse.jdt.core.formatter.comment.format_html=true
-org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=false
-org.eclipse.jdt.core.formatter.comment.format_line_comments=false
-org.eclipse.jdt.core.formatter.comment.format_source_code=true
-org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true
-org.eclipse.jdt.core.formatter.comment.indent_root_tags=true
-org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
-org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=insert
-org.eclipse.jdt.core.formatter.comment.line_length=80
-org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries=true
-org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries=true
-org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments=false
-org.eclipse.jdt.core.formatter.compact_else_if=true
-org.eclipse.jdt.core.formatter.continuation_indentation=2
-org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2
-org.eclipse.jdt.core.formatter.disabling_tag=@formatter\:off
-org.eclipse.jdt.core.formatter.enabling_tag=@formatter\:on
-org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line=false
-org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header=true
-org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_empty_lines=false
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_block=true
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_body=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=false
-org.eclipse.jdt.core.formatter.indentation.size=4
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_label=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_ellipsis=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_try=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_try_resources=insert
-org.eclipse.jdt.core.formatter.insert_space_after_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_try=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_ellipsis=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=insert
-org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_try_resources=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.join_lines_in_comments=true
-org.eclipse.jdt.core.formatter.join_wrapped_lines=true
-org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.lineSplit=999
-org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
-org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1
-org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=true
-org.eclipse.jdt.core.formatter.tabulation.char=space
-org.eclipse.jdt.core.formatter.tabulation.size=4
-org.eclipse.jdt.core.formatter.use_on_off_tags=false
-org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
-org.eclipse.jdt.core.formatter.wrap_before_binary_operator=true
-org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch=true
-org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested=true
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.jdt.ui.prefs b/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
deleted file mode 100644
index d521bab..0000000
--- a/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
+++ /dev/null
@@ -1,7 +0,0 @@
-eclipse.preferences.version=1
-formatter_profile=_Space Indent & Long Lines
-formatter_settings_version=12
-org.eclipse.jdt.ui.ignorelowercasenames=true
-org.eclipse.jdt.ui.importorder=java;javax;org;com;
-org.eclipse.jdt.ui.ondemandthreshold=99
-org.eclipse.jdt.ui.staticondemandthreshold=99
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/engine-streaming/pom.xml b/engine-streaming/pom.xml
deleted file mode 100644
index 876279d..0000000
--- a/engine-streaming/pom.xml
+++ /dev/null
@@ -1,121 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>kylin-engine-streaming</artifactId>
- <packaging>jar</packaging>
- <name>Apache Kylin - Streaming Engine</name>
-
- <parent>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin</artifactId>
- <version>1.6.0-SNAPSHOT</version>
-
- </parent>
-
- <properties>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-cube</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-storage</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-job</artifactId>
- </dependency>
-
- <!-- Env & Test -->
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-common</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>provided</scope>
- <!-- protobuf version conflict with hbase -->
- <exclusions>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-app</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.mrunit</groupId>
- <artifactId>mrunit</artifactId>
- <classifier>hadoop2</classifier>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
-
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
deleted file mode 100644
index 35bdfa8..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming;
-
-/**
- */
-public class BootstrapConfig {
-
- private String cubeName;
- private long start = 0L;
- private long end = 0L;
-
- private boolean fillGap;
- private long maxFillGapRange = 4 * 3600 * 1000L;
-
- public long getStart() {
- return start;
- }
-
- public void setStart(long start) {
- this.start = start;
- }
-
- public long getEnd() {
- return end;
- }
-
- public void setEnd(long end) {
- this.end = end;
- }
-
- public String getCubeName() {
- return cubeName;
- }
-
- public void setCubeName(String cubeName) {
- this.cubeName = cubeName;
- }
-
- public boolean isFillGap() {
- return fillGap;
- }
-
- public void setFillGap(boolean fillGap) {
- this.fillGap = fillGap;
- }
-
- public long getMaxFillGapRange() {
- return maxFillGapRange;
- }
-
- public void setMaxFillGapRange(long maxFillGapRange) {
- this.maxFillGapRange = maxFillGapRange;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
deleted file mode 100644
index c583283..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-/**
- */
-public interface IStreamingInput {
-
- StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime);
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
deleted file mode 100644
index cb15e2b..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-
-/**
- */
-public interface IStreamingOutput {
-
- ICuboidWriter getCuboidWriter(IBuildable buildable);
-
- void output(IBuildable buildable, Map<Long, HyperLogLogPlusCounter> samplingResult);
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
deleted file mode 100644
index c9da46e..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.engine.streaming.util.StreamingUtils;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public class OneOffStreamingBuilder {
-
- private final IStreamingInput streamingInput;
- private final IStreamingOutput streamingOutput;
- private final StreamingBatchBuilder streamingBatchBuilder;
- private final long startTime;
- private final long endTime;
- private final RealizationType realizationType;
- private final String realizationName;
-
- public OneOffStreamingBuilder(RealizationType realizationType, String realizationName, long startTime, long endTime) {
- Preconditions.checkArgument(startTime < endTime);
- this.startTime = startTime;
- this.endTime = endTime;
- this.realizationType = Preconditions.checkNotNull(realizationType);
- this.realizationName = Preconditions.checkNotNull(realizationName);
- this.streamingInput = Preconditions.checkNotNull(StreamingUtils.getStreamingInput());
- this.streamingOutput = Preconditions.checkNotNull(StreamingUtils.getStreamingOutput());
- this.streamingBatchBuilder = Preconditions.checkNotNull(StreamingUtils.getMicroBatchBuilder(realizationType, realizationName));
- }
-
- public Runnable build() {
- return new Runnable() {
- @Override
- public void run() {
- StreamingBatch streamingBatch = streamingInput.getBatchWithTimeWindow(realizationType, realizationName, -1, startTime, endTime);
- final IBuildable buildable = streamingBatchBuilder.createBuildable(streamingBatch);
- final Map<Long, HyperLogLogPlusCounter> samplingResult = streamingBatchBuilder.sampling(streamingBatch);
- final Map<TblColRef, Dictionary<String>> dictionaryMap = streamingBatchBuilder.buildDictionary(streamingBatch, buildable);
- streamingBatchBuilder.build(streamingBatch, dictionaryMap, streamingOutput.getCuboidWriter(buildable));
- streamingOutput.output(buildable, samplingResult);
- streamingBatchBuilder.commit(buildable);
- }
- };
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
deleted file mode 100644
index 8b0b8e6..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- */
-public interface StreamingBatchBuilder {
-
- IBuildable createBuildable(StreamingBatch streamingBatch);
-
- Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch);
-
- Map<TblColRef, Dictionary<String>> buildDictionary(StreamingBatch streamingBatch, IBuildable buildable);
-
- void build(StreamingBatch streamingBatch, Map<TblColRef, Dictionary<String>> dictionaryMap, ICuboidWriter cuboidWriter);
-
- void commit(IBuildable buildable);
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
deleted file mode 100644
index 9d1a0b1..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.persistence.Serializer;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class StreamingConfig extends RootPersistentEntity {
-
- public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
-
- public static final String STREAMING_TYPE_KAFKA = "kafka";
-
- @JsonProperty("name")
- private String name;
-
- @JsonProperty("type")
- private String type = STREAMING_TYPE_KAFKA;
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getResourcePath() {
- return concatResourcePath(name);
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public static String concatResourcePath(String name) {
- return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
- }
-
- @Override
- public StreamingConfig clone() {
- try {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- SERIALIZER.serialize(this, new DataOutputStream(baos));
- return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
- } catch (IOException e) {
- throw new RuntimeException(e);//in mem, should not happen
- }
- }
-
-}