You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/10 05:37:58 UTC
[1/6] kylin git commit: KYLIN-2055 Add an encoder for Boolean type
Repository: kylin
Updated Branches:
refs/heads/master 71f373507 -> ed643e6b2
KYLIN-2055 Add an encoder for Boolean type
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cb2b12b3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cb2b12b3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cb2b12b3
Branch: refs/heads/master
Commit: cb2b12b3b619ac86efbb9c7ca708418882683daf
Parents: 71f3735
Author: shaofengshi <sh...@apache.org>
Authored: Mon Oct 10 13:30:27 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 10 13:30:27 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/dimension/BooleanDimEnc.java | 196 +++++++++++++++++++
.../dimension/DimensionEncodingFactory.java | 1 +
.../kylin/dimension/BooleanDimEncTest.java | 95 +++++++++
.../cubeDesigner/advanced_settings.html | 2 +-
4 files changed, 293 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/cb2b12b3/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java
new file mode 100644
index 0000000..f32724c
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dimension;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+
+/**
+ * Encoding Boolean values to bytes
+ */
+public class BooleanDimEnc extends DimensionEncoding {
+ private static final long serialVersionUID = 1L;
+
+ public static final String ENCODING_NAME = "boolean";
+
+ //NOTE: when add new value, append to the array tail, DO NOT insert!
+ public static String[] ALLOWED_VALUES = new String[] { "", "true", "false", "TRUE", "FALSE", "True", "False", "t", "f", "T", "F", "yes", "no", "YES", "NO", "Yes", "No", "y", "n", "Y", "N", "1", "0" };
+
+ public static final Map<String, Integer> map = Maps.newHashMap();
+
+ static {
+ for (int i = 0; i < ALLOWED_VALUES.length; i++) {
+ map.put(ALLOWED_VALUES[i], i);
+ }
+ }
+
+ public static class Factory extends DimensionEncodingFactory {
+ @Override
+ public String getSupportedEncodingName() {
+ return ENCODING_NAME;
+ }
+
+ @Override
+ public DimensionEncoding createDimensionEncoding(String encodingName, String[] args) {
+ return new BooleanDimEnc();
+ }
+ };
+
+ // ============================================================================
+
+ private static int fixedLen = 1;
+
+ //no-arg constructor is required for Externalizable
+ public BooleanDimEnc() {
+ }
+
+ @Override
+ public int getLengthOfEncoding() {
+ return fixedLen;
+ }
+
+ @Override
+ public void encode(byte[] value, int valueLen, byte[] output, int outputOffset) {
+ if (value == null) {
+ Arrays.fill(output, outputOffset, outputOffset + fixedLen, NULL);
+ return;
+ }
+
+ encode(Bytes.toString(value, 0, valueLen), output, outputOffset);
+ }
+
+ void encode(String valueStr, byte[] output, int outputOffset) {
+ if (valueStr == null) {
+ Arrays.fill(output, outputOffset, outputOffset + fixedLen, NULL);
+ return;
+ }
+
+ Integer encodeValue = map.get(valueStr);
+ if (encodeValue == null) {
+ throw new IllegalArgumentException("Value '" + valueStr + "' is not a recognized boolean value.");
+ }
+
+ BytesUtil.writeLong(encodeValue, output, outputOffset, fixedLen);
+ }
+
+ @Override
+ public String decode(byte[] bytes, int offset, int len) {
+ if (isNull(bytes, offset, len)) {
+ return null;
+ }
+
+ int x = (int) BytesUtil.readLong(bytes, offset, len);
+ if (x >= ALLOWED_VALUES.length) {
+ throw new IllegalStateException();
+ }
+
+ return ALLOWED_VALUES[x];
+ }
+
+ @Override
+ public DataTypeSerializer<Object> asDataTypeSerializer() {
+ return new BooleanSerializer();
+ }
+
+ private class BooleanSerializer extends DataTypeSerializer<Object> {
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
+
+ private byte[] currentBuf() {
+ byte[] buf = current.get();
+ if (buf == null) {
+ buf = new byte[fixedLen];
+ current.set(buf);
+ }
+ return buf;
+ }
+
+ @Override
+ public void serialize(Object value, ByteBuffer out) {
+ byte[] buf = currentBuf();
+ String valueStr = value == null ? null : value.toString();
+ encode(valueStr, buf, 0);
+ out.put(buf);
+ }
+
+ @Override
+ public Object deserialize(ByteBuffer in) {
+ byte[] buf = currentBuf();
+ in.get(buf);
+ return decode(buf, 0, buf.length);
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return fixedLen;
+ }
+
+ @Override
+ public int maxLength() {
+ return fixedLen;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return fixedLen;
+ }
+
+ @Override
+ public Object valueOf(String str) {
+ return str;
+ }
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(fixedLen);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ fixedLen = in.readShort();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ BooleanDimEnc that = (BooleanDimEnc) o;
+
+ return fixedLen == that.fixedLen;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return fixedLen;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cb2b12b3/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
index 27bebd7..aba0c26 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
@@ -79,6 +79,7 @@ public abstract class DimensionEncodingFactory {
map.put(FixedLenHexDimEnc.ENCODING_NAME, new FixedLenHexDimEnc.Factory());
map.put(DateDimEnc.ENCODING_NAME, new DateDimEnc.Factory());
map.put(TimeDimEnc.ENCODING_NAME, new TimeDimEnc.Factory());
+ map.put(BooleanDimEnc.ENCODING_NAME, new BooleanDimEnc.Factory());
// custom encodings
String[] clsNames = KylinConfig.getInstanceFromEnv().getCubeDimensionCustomEncodingFactories();
http://git-wip-us.apache.org/repos/asf/kylin/blob/cb2b12b3/core-metadata/src/test/java/org/apache/kylin/dimension/BooleanDimEncTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/dimension/BooleanDimEncTest.java b/core-metadata/src/test/java/org/apache/kylin/dimension/BooleanDimEncTest.java
new file mode 100644
index 0000000..c6c1416
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/dimension/BooleanDimEncTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dimension;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BooleanDimEncTest {
+
+ @Test
+ public void testNull() {
+ BooleanDimEnc enc = new BooleanDimEnc();
+
+ byte[] buf = new byte[enc.getLengthOfEncoding()];
+ enc.encode(null, 0, buf, 0);
+ Assert.assertTrue(DimensionEncoding.isNull(buf, 0, buf.length));
+ String decode = enc.decode(buf, 0, buf.length);
+ Assert.assertEquals(null, decode);
+
+ buf = new byte[enc.getLengthOfEncoding()];
+ DataTypeSerializer<Object> ser = enc.asDataTypeSerializer();
+ ser.serialize(null, ByteBuffer.wrap(buf));
+ Assert.assertTrue(DimensionEncoding.isNull(buf, 0, buf.length));
+ decode = (String) ser.deserialize(ByteBuffer.wrap(buf));
+ Assert.assertEquals(null, decode);
+ }
+
+ @Test
+ public void testEncodeDecode() {
+ BooleanDimEnc enc = new BooleanDimEnc();
+
+ for (String x : BooleanDimEnc.ALLOWED_VALUES) {
+ testEncodeDecode(enc, x);
+ }
+
+ try {
+ testEncodeDecode(enc, "FAlse");
+ Assert.fail();
+ } catch (Throwable e) {
+ Assert.assertEquals("Value 'FAlse' is not a recognized boolean value.", e.getMessage());
+ }
+ }
+
+ private void testEncodeDecode(BooleanDimEnc enc, String valueStr) {
+ byte[] buf = new byte[enc.getLengthOfEncoding()];
+ byte[] bytes = Bytes.toBytes(valueStr);
+ enc.encode(bytes, bytes.length, buf, 0);
+ String decode = enc.decode(buf, 0, buf.length);
+ Assert.assertEquals(valueStr, decode);
+ }
+
+ @Test
+ public void testSerDes() {
+ BooleanDimEnc enc = new BooleanDimEnc();
+ for (String x : BooleanDimEnc.ALLOWED_VALUES) {
+ testSerDes(enc, x);
+ }
+
+ try {
+ testSerDes(enc, "FAlse");
+ Assert.fail();
+ } catch (Throwable e) {
+ Assert.assertEquals("Value 'FAlse' is not a recognized boolean value.", e.getMessage());
+ }
+ }
+
+ private void testSerDes(BooleanDimEnc enc, String valueStr) {
+ DataTypeSerializer<Object> ser = enc.asDataTypeSerializer();
+ byte[] buf = new byte[enc.getLengthOfEncoding()];
+ ser.serialize(valueStr, ByteBuffer.wrap(buf));
+ String decode = (String) ser.deserialize(ByteBuffer.wrap(buf));
+ Assert.assertEquals(valueStr, decode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cb2b12b3/webapp/app/partials/cubeDesigner/advanced_settings.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/advanced_settings.html b/webapp/app/partials/cubeDesigner/advanced_settings.html
index e8cbf9e..34fd492 100755
--- a/webapp/app/partials/cubeDesigner/advanced_settings.html
+++ b/webapp/app/partials/cubeDesigner/advanced_settings.html
@@ -244,7 +244,7 @@
<!--Column Length -->
<input type="text" class="form-control" placeholder="Column Length.." ng-if="state.mode=='edit'"
tooltip="rowkey column length.." tooltip-trigger="focus"
- ng-disabled="rowkey_column.encoding=='dict'||rowkey_column.encoding=='date'||rowkey_column.encoding=='time'"
+ ng-disabled="rowkey_column.encoding=='dict'||rowkey_column.encoding=='date'||rowkey_column.encoding=='time'||rowkey_column.encoding=='boolean'"
ng-change="refreshRowKey(convertedRowkeys,$index,rowkey_column);"
ng-model="rowkey_column.valueLength" class="form-control">
[2/6] kylin git commit: KYLIN-2072 Cleanup old streaming code
Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
deleted file mode 100644
index 1c579c6..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class TimeHistogram {
- private long[] bucketsBoundary;
- private AtomicLong[] counters;
- private String id;
-
- private static Object printLock = new Object();
-
- /**
- * example: [10,20] will generate three buckets: (-\u221e,10), [10,20),[20,+\u221e)
- * unit: second
- */
- public TimeHistogram(long[] bucketsBoundary, String id) {
- this.bucketsBoundary = bucketsBoundary;
- this.counters = new AtomicLong[this.bucketsBoundary.length + 1];
- for (int i = 0; i < counters.length; i++) {
- this.counters[i] = new AtomicLong();
- }
- this.id = id;
- }
-
- /**
- * @param second in seconds
- */
- public void process(long second) {
- for (int i = 0; i < bucketsBoundary.length; ++i) {
- if (second < bucketsBoundary[i]) {
- counters[i].incrementAndGet();
- return;
- }
- }
-
- counters[bucketsBoundary.length].incrementAndGet();
- }
-
- /**
- * @param millis in milli seconds
- */
- public void processMillis(long millis) {
- process(millis / 1000);
- }
-
- public void printStatus() {
- long[] countersSnapshot = new long[counters.length];
- for (int i = 0; i < countersSnapshot.length; i++) {
- countersSnapshot[i] = counters[i].get();
- }
-
- long sum = 0;
- for (long counter : countersSnapshot) {
- sum += counter;
- }
-
- synchronized (printLock) {
- System.out.println("============== status of TimeHistogram " + id + " =================");
-
- for (int i = 0; i < countersSnapshot.length; ++i) {
- System.out.println(String.format("bucket: %d , count: %d ,percentage: %.4f", i, countersSnapshot[i], 1.0 * countersSnapshot[i] / (sum == 0 ? 1 : sum)));
- }
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
new file mode 100644
index 0000000..7a42598
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ */
+public class ByteBufferBackedInputStream extends InputStream {
+
+ private ByteBuffer buf;
+
+ public ByteBufferBackedInputStream(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+ return buf.get() & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] bytes, int off, int len) throws IOException {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+
+ len = Math.min(len, buf.remaining());
+ buf.get(bytes, off, len);
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
deleted file mode 100644
index bce9bb9..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka.util;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-import kafka.cluster.BrokerEndPoint;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.source.kafka.TopicMeta;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.TopicMetadataResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-/**
- */
-public final class KafkaRequester {
-
- private static final Logger logger = LoggerFactory.getLogger(KafkaRequester.class);
-
- private static ConcurrentMap<String, SimpleConsumer> consumerCache = Maps.newConcurrentMap();
-
- static {
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- KafkaRequester.shutdown();
- }
- }));
- }
-
- private static SimpleConsumer getSimpleConsumer(Broker broker, int timeout, int bufferSize, String clientId) {
- String key = createKey(broker, timeout, bufferSize, clientId);
- if (consumerCache.containsKey(key)) {
- return consumerCache.get(key);
- } else {
- BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT);
- consumerCache.putIfAbsent(key, new SimpleConsumer(brokerEndPoint.host(), brokerEndPoint.port(), timeout, bufferSize, clientId));
- return consumerCache.get(key);
- }
- }
-
- private static String createKey(Broker broker, int timeout, int bufferSize, String clientId) {
- return broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).connectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
- }
-
- public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) {
- SimpleConsumer consumer;
- for (Broker broker : kafkaClusterConfig.getBrokers()) {
- consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(kafkaClusterConfig.getTopic());
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- TopicMetadataResponse resp;
- try {
- resp = consumer.send(req);
- } catch (Exception e) {
- logger.warn("cannot send TopicMetadataRequest successfully: " + e);
- continue;
- }
- final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
- if (topicMetadatas.size() != 1) {
- break;
- }
- final TopicMetadata topicMetadata = topicMetadatas.get(0);
- if (topicMetadata.errorCode() != 0) {
- break;
- }
- List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
- @Nullable
- @Override
- public Integer apply(PartitionMetadata partitionMetadata) {
- return partitionMetadata.partitionId();
- }
- });
- return new TopicMeta(kafkaClusterConfig.getTopic(), partitionIds);
- }
- logger.debug("cannot find topic:" + kafkaClusterConfig.getTopic());
- return null;
- }
-
- public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaClusterConfig kafkaClusterConfig) {
- logger.debug("Brokers: " + brokers.toString());
- SimpleConsumer consumer;
- for (Broker broker : brokers) {
- consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(topic);
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- TopicMetadataResponse resp;
- try {
- resp = consumer.send(req);
- } catch (Exception e) {
- logger.warn("cannot send TopicMetadataRequest successfully: " + e);
- continue;
- }
- final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
- if (topicMetadatas.size() != 1) {
- logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
- break;
- }
- final TopicMetadata topicMetadata = topicMetadatas.get(0);
- if (topicMetadata.errorCode() != 0) {
- logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
- break;
- }
- for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
- StringBuffer logText = new StringBuffer();
- logText.append("PartitionMetadata debug errorCode: " + partitionMetadata.errorCode());
- logText.append("PartitionMetadata debug partitionId: " + partitionMetadata.partitionId());
- logText.append("PartitionMetadata debug leader: " + partitionMetadata.leader());
- logText.append("PartitionMetadata debug ISR: " + partitionMetadata.isr());
- logText.append("PartitionMetadata debug replica: " + partitionMetadata.replicas());
- logger.info(logText.toString());
- if (partitionMetadata.partitionId() == partitionId) {
- return partitionMetadata;
- }
- }
- }
- logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
- return null;
- }
-
- public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
- final String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
- kafka.api.FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partitionId, offset, 1048576) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka, 1048576 is the default value on shell
- .build();
- return consumer.fetch(req);
- }
-
- public static long getLastOffset(String topic, int partitionId, long whichTime, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
- String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
- OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
- OffsetResponse response = consumer.getOffsetsBefore(request);
-
- if (response.hasError()) {
- logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
- return 0;
- }
- long[] offsets = response.offsets(topic, partitionId);
- return offsets[0];
- }
-
- public static void shutdown() {
- for (SimpleConsumer simpleConsumer : consumerCache.values()) {
- simpleConsumer.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
deleted file mode 100644
index ee5bb20..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.util;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.source.kafka.StreamingParser;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-
-/**
- */
-public final class KafkaUtils {
-
- private static final Logger logger = LoggerFactory.getLogger(KafkaUtils.class);
-
- private static final int MAX_RETRY_TIMES = 6;
-
- private KafkaUtils() {
- }
-
- public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
- final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
- if (partitionMetadata != null) {
- if (partitionMetadata.errorCode() != 0) {
- logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
- }
- return new Broker(partitionMetadata.leader(), SecurityProtocol.PLAINTEXT);
- } else {
- return null;
- }
- }
-
- private static void sleep(int retryTimes) {
- int seconds = (int) Math.pow(2, retryTimes);
- logger.info("retry times:" + retryTimes + " sleep:" + seconds + " seconds");
- try {
- Thread.sleep(seconds * 1000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static MessageAndOffset getKafkaMessage(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset) {
- final String topic = kafkaClusterConfig.getTopic();
- int retry = 0;
- while (retry < MAX_RETRY_TIMES) {//max sleep time 63 seconds
- final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
- if (leadBroker == null) {
- logger.warn("unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
- sleep(retry++);
- continue;
- }
- final FetchResponse response = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
- if (response.errorCode(topic, partitionId) != 0) {
- logger.warn("errorCode of FetchResponse is:" + response.errorCode(topic, partitionId));
- sleep(retry++);
- continue;
- }
- final Iterator<MessageAndOffset> iterator = response.messageSet(topic, partitionId).iterator();
- if (!iterator.hasNext()) {
- logger.warn("messageSet is empty");
- sleep(retry++);
- continue;
- }
- return iterator.next();
- }
- throw new IllegalStateException(String.format("try to get timestamp of topic: %s, partitionId: %d, offset: %d, failed to get StreamMessage from kafka", topic, partitionId, offset));
- }
-
- public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamingParser streamingParser) {
- Pair<Long, Long> firstAndLast = getFirstAndLastOffset(kafkaClusterConfig, partitionId);
- final String topic = kafkaClusterConfig.getTopic();
-
- logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, firstAndLast.getFirst(), firstAndLast.getSecond()));
- final long result = binarySearch(kafkaClusterConfig, partitionId, firstAndLast.getFirst(), firstAndLast.getSecond(), timestamp, streamingParser);
- logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, partitionId, result));
- return result;
- }
-
- public static Pair<Long, Long> getFirstAndLastOffset(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
- final String topic = kafkaClusterConfig.getTopic();
- final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
- final long earliestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
- final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig) - 1;
- return Pair.newPair(earliestOffset, latestOffset);
- }
-
- private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamingParser streamingParser) {
- Map<Long, Long> cache = Maps.newHashMap();
-
- while (startOffset < endOffset) {
- long midOffset = startOffset + ((endOffset - startOffset) >> 1);
- long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamingParser, cache);
- long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamingParser, cache);
- long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamingParser, cache);
- // hard to ensure these 2 conditions
- // Preconditions.checkArgument(startTimestamp <= midTimestamp);
- // Preconditions.checkArgument(midTimestamp <= endTimestamp);
- if (startTimestamp >= targetTimestamp) {
- return startOffset;
- }
- if (endTimestamp <= targetTimestamp) {
- return endOffset;
- }
- if (targetTimestamp == midTimestamp) {
- return midOffset;
- } else if (targetTimestamp < midTimestamp) {
- endOffset = midOffset - 1;
- continue;
- } else {
- startOffset = midOffset + 1;
- continue;
- }
- }
- return startOffset;
- }
-
- private static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamingParser streamingParser, Map<Long, Long> cache) {
- if (cache.containsKey(offset)) {
- return cache.get(offset);
- } else {
- long t = getDataTimestamp(kafkaClusterConfig, partitionId, offset, streamingParser);
- cache.put(offset, t);
- return t;
- }
- }
-
- public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamingParser streamingParser) {
- final String topic = kafkaClusterConfig.getTopic();
- final MessageAndOffset messageAndOffset = getKafkaMessage(kafkaClusterConfig, partitionId, offset);
- final ByteBuffer payload = messageAndOffset.message().payload();
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
- streamingMessage.setOffset(messageAndOffset.offset());
- logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, streamingMessage.getTimestamp()));
- return streamingMessage.getTimestamp();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/test/java/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
index fb33059..5a52b61 100644
--- a/source-kafka/src/test/java/TimedJsonStreamParserTest.java
+++ b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
@@ -20,7 +20,8 @@ import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
import com.fasterxml.jackson.databind.type.MapType;
@@ -32,7 +33,6 @@ import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 1d33071..23e7239 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -41,10 +41,6 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-engine-mr</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-engine-streaming</artifactId>
- </dependency>
<!-- Env & Test -->
<dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
deleted file mode 100644
index 9adaf24..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.storage.hbase.steps;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.inmemcubing.CompoundCuboidWriter;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
-import org.apache.kylin.engine.streaming.IStreamingOutput;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class HBaseStreamingOutput implements IStreamingOutput {
-
- private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class);
-
- @Override
- public ICuboidWriter getCuboidWriter(IBuildable buildable) {
- try {
- CubeSegment cubeSegment = (CubeSegment) buildable;
-
- final HTableInterface hTable;
- hTable = createHTable(cubeSegment);
- List<ICuboidWriter> cuboidWriters = Lists.newArrayList();
- cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable));
- cuboidWriters.add(new SequenceFileCuboidWriter(cubeSegment.getCubeDesc(), cubeSegment));
- return new CompoundCuboidWriter(cuboidWriters);
- } catch (IOException e) {
- throw new RuntimeException("failed to get ICuboidWriter", e);
- }
- }
-
- @Override
- public void output(IBuildable buildable, Map<Long, HyperLogLogPlusCounter> samplingResult) {
- try {
- CubeSegment cubeSegment = (CubeSegment) buildable;
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- final Configuration conf = HadoopUtil.getCurrentConfiguration();
- final Path outputPath = new Path("file://" + BatchConstants.CFG_STATISTICS_LOCAL_DIR + UUID.randomUUID().toString());
- CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
- FSDataInputStream inputStream = null;
- try {
- inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME));
- ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), inputStream, System.currentTimeMillis());
- } finally {
- IOUtils.closeQuietly(inputStream);
- FileSystem.getLocal(conf).delete(outputPath, true);
- }
- } catch (IOException e) {
- throw new RuntimeException("failed to write sampling result", e);
- }
- }
-
- private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException {
- final String hTableName = cubeSegment.getStorageLocationIdentifier();
- CubeHTableUtil.createHTable(cubeSegment, null);
- final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
- logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
- return hTable;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
index f4fb308..9cb135a 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
@@ -35,8 +35,8 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.job.dao.ExecutableDao;
import org.apache.kylin.job.dao.ExecutablePO;
import org.apache.kylin.job.exception.PersistentException;
[3/6] kylin git commit: KYLIN-2072 Cleanup old streaming code
Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
deleted file mode 100644
index 271bf41..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.cachesync.Broadcaster;
-import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
-import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class StreamingManager {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class);
-
- // static cached instances
- private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
-
- public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
-
- private KylinConfig config;
-
- // name ==> StreamingConfig
- private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
-
- public static void clearCache() {
- CACHE.clear();
- }
-
- private StreamingManager(KylinConfig config) throws IOException {
- this.config = config;
- this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming");
-
- // touch lower level metadata before registering my listener
- reloadAllStreaming();
- Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming");
- }
-
- private class StreamingSyncListener extends Broadcaster.Listener {
- @Override
- public void onClearAll(Broadcaster broadcaster) throws IOException {
- clearCache();
- }
-
- @Override
- public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
- if (event == Event.DROP)
- removeStreamingLocal(cacheKey);
- else
- reloadStreamingConfigLocal(cacheKey);
- }
- }
-
- private ResourceStore getStore() {
- return ResourceStore.getStore(this.config);
- }
-
- public static StreamingManager getInstance(KylinConfig config) {
- StreamingManager r = CACHE.get(config);
- if (r != null) {
- return r;
- }
-
- synchronized (StreamingManager.class) {
- r = CACHE.get(config);
- if (r != null) {
- return r;
- }
- try {
- r = new StreamingManager(config);
- CACHE.put(config, r);
- if (CACHE.size() > 1) {
- logger.warn("More than one singleton exist");
- }
- return r;
- } catch (IOException e) {
- throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
- }
- }
- }
-
- private static String formatStreamingConfigPath(String name) {
- return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
- }
-
- private static String formatStreamingOutputPath(String streaming, int partition) {
- return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
- }
-
- private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
- return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
- }
-
- public StreamingConfig getStreamingConfig(String name) {
- return streamingMap.get(name);
- }
-
- public List<StreamingConfig> listAllStreaming() {
- return new ArrayList<>(streamingMap.values());
- }
-
- /**
- * Reload StreamingConfig from resource store It will be triggered by an desc
- * update event.
- *
- * @param name
- * @throws IOException
- */
- public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException {
-
- // Save Source
- String path = StreamingConfig.concatResourcePath(name);
-
- // Reload the StreamingConfig
- StreamingConfig ndesc = loadStreamingConfigAt(path);
-
- // Here replace the old one
- streamingMap.putLocal(ndesc.getName(), ndesc);
- return ndesc;
- }
-
- // remove streamingConfig
- public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
- String path = streamingConfig.getResourcePath();
- getStore().deleteResource(path);
- streamingMap.remove(streamingConfig.getName());
- }
-
- public StreamingConfig getConfig(String name) {
- name = name.toUpperCase();
- return streamingMap.get(name);
- }
-
- public void removeStreamingLocal(String streamingName) {
- streamingMap.removeLocal(streamingName);
- }
-
- /**
- * Update CubeDesc with the input. Broadcast the event into cluster
- *
- * @param desc
- * @return
- * @throws IOException
- */
- public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException {
- // Validate CubeDesc
- if (desc.getUuid() == null || desc.getName() == null) {
- throw new IllegalArgumentException("SteamingConfig Illegal.");
- }
- String name = desc.getName();
- if (!streamingMap.containsKey(name)) {
- throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
- }
-
- // Save Source
- String path = desc.getResourcePath();
- getStore().putResource(path, desc, STREAMING_SERIALIZER);
-
- // Reload the StreamingConfig
- StreamingConfig ndesc = loadStreamingConfigAt(path);
- // Here replace the old one
- streamingMap.put(ndesc.getName(), desc);
-
- return ndesc;
- }
-
- public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
- if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
- throw new IllegalArgumentException();
- }
-
- if (streamingMap.containsKey(streamingConfig.getName()))
- throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
-
- String path = StreamingConfig.concatResourcePath(streamingConfig.getName());
- getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
- streamingMap.put(streamingConfig.getName(), streamingConfig);
- return streamingConfig;
- }
-
- private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
- ResourceStore store = getStore();
- StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
-
- if (StringUtils.isBlank(streamingDesc.getName())) {
- throw new IllegalStateException("StreamingConfig name must not be blank");
- }
- return streamingDesc;
- }
-
- private void reloadAllStreaming() throws IOException {
- ResourceStore store = getStore();
- logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
-
- streamingMap.clear();
-
- List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
- for (String path : paths) {
- StreamingConfig streamingConfig;
- try {
- streamingConfig = loadStreamingConfigAt(path);
- } catch (Exception e) {
- logger.error("Error loading streaming desc " + path, e);
- continue;
- }
- if (path.equals(streamingConfig.getResourcePath()) == false) {
- logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath());
- continue;
- }
- if (streamingMap.containsKey(streamingConfig.getName())) {
- logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path);
- continue;
- }
-
- streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
- }
-
- logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
deleted file mode 100644
index 32030ad..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.cli;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class MonitorCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
-
- public static void main(String[] args) {
- Preconditions.checkArgument(args[0].equals("monitor"));
-
- int i = 1;
- List<String> receivers = null;
- String host = null;
- String tableName = null;
- String authorization = null;
- String cubeName = null;
- String projectName = "default";
- while (i < args.length) {
- String argName = args[i];
- switch (argName) {
- case "-receivers":
- receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
- break;
- case "-host":
- host = args[++i];
- break;
- case "-tableName":
- tableName = args[++i];
- break;
- case "-authorization":
- authorization = args[++i];
- break;
- case "-cubeName":
- cubeName = args[++i];
- break;
- case "-projectName":
- projectName = args[++i];
- break;
- default:
- throw new RuntimeException("invalid argName:" + argName);
- }
- i++;
- }
- Preconditions.checkArgument(receivers != null && receivers.size() > 0);
- final StreamingMonitor streamingMonitor = new StreamingMonitor();
- if (tableName != null) {
- logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
- Preconditions.checkNotNull(host);
- Preconditions.checkNotNull(authorization);
- Preconditions.checkNotNull(tableName);
- streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
- }
- if (cubeName != null) {
- logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
- streamingMonitor.checkCube(receivers, cubeName, host);
- }
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
deleted file mode 100644
index 1d66b41..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.cli;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.streaming.BootstrapConfig;
-import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
-import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class StreamingCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
-
- public static void main(String[] args) {
- try {
- Preconditions.checkArgument(args[0].equals("streaming"));
- Preconditions.checkArgument(args[1].equals("start"));
-
- int i = 2;
- BootstrapConfig bootstrapConfig = new BootstrapConfig();
- while (i < args.length) {
- String argName = args[i];
- switch (argName) {
- case "-start":
- bootstrapConfig.setStart(Long.parseLong(args[++i]));
- break;
- case "-end":
- bootstrapConfig.setEnd(Long.parseLong(args[++i]));
- break;
- case "-cube":
- bootstrapConfig.setCubeName(args[++i]);
- break;
- case "-fillGap":
- bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
- break;
- case "-maxFillGapRange":
- bootstrapConfig.setMaxFillGapRange(Long.parseLong(args[++i]));
- break;
- default:
- logger.warn("ignore this arg:" + argName);
- }
- i++;
- }
- if (bootstrapConfig.isFillGap()) {
- final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(bootstrapConfig.getCubeName());
- logger.info("all gaps:" + StringUtils.join(gaps, ","));
- for (Pair<Long, Long> gap : gaps) {
- List<Pair<Long, Long>> splitGaps = splitGap(gap, bootstrapConfig.getMaxFillGapRange());
- for (Pair<Long, Long> splitGap : splitGaps) {
- logger.info("start filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
- startOneOffCubeStreaming(bootstrapConfig.getCubeName(), splitGap.getFirst(), splitGap.getSecond());
- logger.info("finish filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond());
- }
- }
- } else {
- startOneOffCubeStreaming(bootstrapConfig.getCubeName(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
- logger.info("streaming process finished, exit with 0");
- System.exit(0);
- }
- } catch (Exception e) {
- printArgsError(args);
- logger.error("error start streaming", e);
- System.exit(-1);
- }
- }
-
- private static List<Pair<Long, Long>> splitGap(Pair<Long, Long> gap, long maxFillGapRange) {
- List<Pair<Long, Long>> gaps = Lists.newArrayList();
- Long startTime = gap.getFirst();
-
- while (startTime < gap.getSecond()) {
- Long endTime = gap.getSecond() <= startTime + maxFillGapRange ? gap.getSecond() : startTime + maxFillGapRange;
- gaps.add(Pair.newPair(startTime, endTime));
- startTime = endTime;
- }
-
- return gaps;
- }
-
- private static void startOneOffCubeStreaming(String cubeName, long start, long end) {
- final Runnable runnable = new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, end).build();
- runnable.run();
- }
-
- private static void printArgsError(String[] args) {
- logger.warn("invalid args:" + StringUtils.join(args, " "));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
deleted file mode 100644
index 350a5f8..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.cube;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.Nullable;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
-import org.apache.kylin.cube.util.CubingUtils;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingCubeBuilder implements StreamingBatchBuilder {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingCubeBuilder.class);
-
- private final String cubeName;
- private int processedRowCount = 0;
-
- public StreamingCubeBuilder(String cubeName) {
- this.cubeName = cubeName;
- }
-
- @Override
- public void build(StreamingBatch streamingBatch, Map<TblColRef, Dictionary<String>> dictionaryMap, ICuboidWriter cuboidWriter) {
- try {
- CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
- final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
-
- LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>();
- InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
- final Future<?> future = Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, cuboidWriter));
- processedRowCount = streamingBatch.getMessages().size();
- for (StreamingMessage streamingMessage : streamingBatch.getMessages()) {
- blockingQueue.put(streamingMessage.getData());
- }
- blockingQueue.put(Collections.<String> emptyList());
- future.get();
- cuboidWriter.flush();
-
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
- } catch (IOException e) {
- throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
- } finally {
- try {
- cuboidWriter.close();
- } catch (IOException e) {
- throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
- }
- }
- }
-
- @Override
- public IBuildable createBuildable(StreamingBatch streamingBatch) {
- CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
- try {
- CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond());
- segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
- segment.setInputRecords(streamingBatch.getMessages().size());
- segment.setLastBuildTime(System.currentTimeMillis());
- return segment;
- } catch (IOException e) {
- throw new RuntimeException("failed to create IBuildable", e);
- }
- }
-
- @Override
- public Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch) {
- final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
- final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
- long start = System.currentTimeMillis();
-
- final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), flatDesc, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
- @Nullable
- @Override
- public List<String> apply(@Nullable StreamingMessage input) {
- return input.getData();
- }
- }));
- logger.info(String.format("sampling of %d messages cost %d ms", streamingBatch.getMessages().size(), (System.currentTimeMillis() - start)));
- return samplingResult;
- }
-
- @Override
- public Map<TblColRef, Dictionary<String>> buildDictionary(StreamingBatch streamingBatch, IBuildable buildable) {
- final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
- final Map<TblColRef, Dictionary<String>> dictionaryMap;
- try {
- dictionaryMap = CubingUtils.buildDictionary(cubeInstance, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
- @Nullable
- @Override
- public List<String> apply(@Nullable StreamingMessage input) {
- return input.getData();
- }
- }));
- Map<TblColRef, Dictionary<String>> realDictMap = CubingUtils.writeDictionary((CubeSegment) buildable, dictionaryMap, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond());
- return realDictMap;
- } catch (IOException e) {
- throw new RuntimeException("failed to build dictionary", e);
- }
- }
-
- @Override
- public void commit(IBuildable buildable) {
- CubeSegment cubeSegment = (CubeSegment) buildable;
- cubeSegment.setStatus(SegmentStatusEnum.READY);
- cubeSegment.setInputRecords(processedRowCount);
- CubeUpdate cubeBuilder = new CubeUpdate(cubeSegment.getCubeInstance());
- cubeBuilder.setToUpdateSegs(cubeSegment);
- try {
- CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).updateCube(cubeBuilder);
- } catch (IOException e) {
- throw new RuntimeException("failed to update CubeSegment", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
deleted file mode 100644
index fba664d..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.diagnose;
-
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.io.FileUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class StreamingLogAnalyzer {
- public static void main(String[] args) {
- int errorFileCount = 0;
- List<Long> ellapsedTimes = Lists.newArrayList();
-
- String patternStr = "(\\d{2}/\\d{2}/\\d{2} \\d{2}:\\d{2}:\\d{2})";
- Pattern pattern = Pattern.compile(patternStr);
-
- SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
- format.setTimeZone(TimeZone.getTimeZone("GMT")); // NOTE: this must be GMT to calculate epoch date correctly
-
- Preconditions.checkArgument(args.length == 1, "Usage: StreamingLogsAnalyser streaming_logs_folder");
- for (File file : FileUtils.listFiles(new File(args[0]), new String[] { "log" }, false)) {
- System.out.println("Processing file " + file.toString());
-
- long startTime = 0;
- long endTime = 0;
- try {
- List<String> contents = Files.readAllLines(file.toPath(), Charset.defaultCharset());
- for (int i = 0; i < contents.size(); ++i) {
- Matcher m = pattern.matcher(contents.get(i));
- if (m.find()) {
- startTime = format.parse("20" + m.group(1)).getTime();
- break;
- }
- }
-
- for (int i = contents.size() - 1; i >= 0; --i) {
- Matcher m = pattern.matcher(contents.get(i));
- if (m.find()) {
- endTime = format.parse("20" + m.group(1)).getTime();
- break;
- }
- }
-
- if (startTime == 0 || endTime == 0) {
- throw new RuntimeException("start time or end time is not found");
- }
-
- if (endTime - startTime < 60000) {
- System.out.println("Warning: this job took less than one minute!!!! " + file.toString());
- }
-
- ellapsedTimes.add(endTime - startTime);
-
- } catch (Exception e) {
- System.out.println("Exception when processing log file " + file.toString());
- System.out.println(e);
- errorFileCount++;
- }
- }
-
- System.out.println("Totally error files count " + errorFileCount);
- System.out.println("Totally normal files processed " + ellapsedTimes.size());
-
- long sum = 0;
- for (Long x : ellapsedTimes) {
- sum += x;
- }
- System.out.println("Avg build time " + (sum / ellapsedTimes.size()));
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
deleted file mode 100644
index 55252c4..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.monitor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingMonitor {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
-
- public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
- String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
- StringBuilder stringBuilder = new StringBuilder();
- String url = host + "/kylin/api/query";
- PostMethod request = new PostMethod(url);
- try {
-
- request.addRequestHeader("Authorization", "Basic " + authorization);
- request.addRequestHeader("Content-Type", "application/json");
- String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
- request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
-
- int statusCode = new HttpClient().executeMethod(request);
- String msg = Bytes.toString(request.getResponseBody());
- stringBuilder.append("host:").append(host).append("\n");
- stringBuilder.append("query:").append(query).append("\n");
- stringBuilder.append("statusCode:").append(statusCode).append("\n");
- if (statusCode == 200) {
- title += "succeed";
- final HashMap<?, ?> hashMap = JsonUtil.readValue(msg, HashMap.class);
- stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
- stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
- } else {
- title += "failed";
- stringBuilder.append("response:").append(msg).append("\n");
- }
- } catch (Exception e) {
- final StringWriter out = new StringWriter();
- e.printStackTrace(new PrintWriter(out));
- title += "failed";
- stringBuilder.append(out.toString());
- } finally {
- request.releaseConnection();
- }
- logger.info("title:" + title);
- logger.info("content:" + stringBuilder.toString());
- sendMail(receivers, title, stringBuilder.toString());
- }
-
- public static final List<Pair<Long, Long>> findGaps(String cubeName) {
- List<CubeSegment> segments = getSortedReadySegments(cubeName);
- List<Pair<Long, Long>> gaps = Lists.newArrayList();
- for (int i = 0; i < segments.size() - 1; ++i) {
- CubeSegment first = segments.get(i);
- CubeSegment second = segments.get(i + 1);
- if (first.getDateRangeEnd() == second.getDateRangeStart()) {
- continue;
- } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
- gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
- }
- }
- return gaps;
- }
-
- private static List<CubeSegment> getSortedReadySegments(String cubeName) {
- final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
- Preconditions.checkNotNull(cube);
- final List<CubeSegment> segments = cube.getSegments(SegmentStatusEnum.READY);
- logger.info("totally " + segments.size() + " cubeSegments");
- Collections.sort(segments);
- return segments;
- }
-
- public static final List<Pair<String, String>> findOverlaps(String cubeName) {
- List<CubeSegment> segments = getSortedReadySegments(cubeName);
- List<Pair<String, String>> overlaps = Lists.newArrayList();
- for (int i = 0; i < segments.size() - 1; ++i) {
- CubeSegment first = segments.get(i);
- CubeSegment second = segments.get(i + 1);
- if (first.getDateRangeEnd() == second.getDateRangeStart()) {
- continue;
- } else {
- overlaps.add(Pair.newPair(first.getName(), second.getName()));
- }
- }
- return overlaps;
- }
-
- public void checkCube(List<String> receivers, String cubeName, String host) {
- final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
- if (cube == null) {
- logger.info("cube:" + cubeName + " does not exist");
- return;
- }
- List<Pair<Long, Long>> gaps = findGaps(cubeName);
- List<Pair<String, String>> overlaps = Lists.newArrayList();
- StringBuilder content = new StringBuilder();
- if (!gaps.isEmpty()) {
- content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
- @Nullable
- @Override
- public String apply(Pair<Long, Long> input) {
- return parseInterval(input);
- }
- }), "\n")).append("\n");
- }
- if (!overlaps.isEmpty()) {
- content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
- }
- if (content.length() > 0) {
- logger.info(content.toString());
- sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
- } else {
- logger.info("no gaps or overlaps");
- }
- }
-
- private String parseInterval(Pair<Long, Long> interval) {
- return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
- }
-
- private void sendMail(List<String> receivers, String title, String content) {
- final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
- mailService.sendMail(receivers, title, content, false);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
deleted file mode 100644
index 5790bc1..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming.util;
-
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.IStreamingOutput;
-import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
-import org.apache.kylin.engine.streaming.cube.StreamingCubeBuilder;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-import com.google.common.base.Preconditions;
-
-/**
- * TODO: like MRUtil, use Factory pattern to allow config
- */
-public class StreamingUtils {
-
- public static IStreamingInput getStreamingInput() {
- return (IStreamingInput) ClassUtil.newInstance("org.apache.kylin.source.kafka.KafkaStreamingInput");
- }
-
- public static IStreamingOutput getStreamingOutput() {
- return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput");
- }
-
- public static StreamingBatchBuilder getMicroBatchBuilder(RealizationType realizationType, String realizationName) {
- Preconditions.checkNotNull(realizationName);
- if (realizationType == RealizationType.CUBE) {
- return new StreamingCubeBuilder(realizationName);
- } else {
- throw new UnsupportedOperationException("not implemented yet");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index c30abc0..a47fcde 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -44,8 +44,8 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 09ef0e8..72e4069 100644
--- a/pom.xml
+++ b/pom.xml
@@ -225,11 +225,6 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-engine-streaming</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
<artifactId>kylin-engine-spark</artifactId>
<version>${project.version}</version>
</dependency>
@@ -1017,7 +1012,6 @@
<module>core-job</module>
<module>core-storage</module>
<module>engine-mr</module>
- <module>engine-streaming</module>
<module>engine-spark</module>
<module>source-hive</module>
<module>source-kafka</module>
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index f3374c3..a5fb874 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.rest.exception.BadRequestException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index c4af5f4..34cc57f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -32,7 +32,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
index abf0638..170c395 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -29,7 +29,7 @@ import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index e49e882..7310d9c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.springframework.beans.factory.annotation.Autowired;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index d4cdfd5..e2100c4 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -43,12 +43,6 @@
<artifactId>kylin-core-common</artifactId>
</dependency>
-
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-engine-streaming</artifactId>
- </dependency>
-
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
deleted file mode 100644
index 6981096..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-/**
- */
-class ByteBufferBackedInputStream extends InputStream {
-
- private ByteBuffer buf;
-
- public ByteBufferBackedInputStream(ByteBuffer buf) {
- this.buf = buf;
- }
-
- @Override
- public int read() throws IOException {
- if (!buf.hasRemaining()) {
- return -1;
- }
- return buf.get() & 0xFF;
- }
-
- @Override
- public int read(byte[] bytes, int off, int len) throws IOException {
- if (!buf.hasRemaining()) {
- return -1;
- }
-
- len = Math.min(len, buf.remaining());
- buf.get(bytes, off, len);
- return len;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index d039583..208c0ce 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -20,7 +20,7 @@ package org.apache.kylin.source.kafka;
import com.google.common.collect.Lists;
import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ReadableTable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
deleted file mode 100644
index 78a67c2..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.google.common.base.Function;
-import kafka.cluster.BrokerEndPoint;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.util.KafkaRequester;
-import org.apache.kylin.source.kafka.util.KafkaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-
-import javax.annotation.Nullable;
-
-@SuppressWarnings("unused")
-public class KafkaStreamingInput implements IStreamingInput {
-
- private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingInput.class);
-
- @Override
- public StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime) {
- if (realizationType != RealizationType.CUBE) {
- throw new IllegalArgumentException("Unsupported realization in KafkaStreamingInput: " + realizationType);
- }
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(realizationName);
- final String streaming = cube.getFactTable();
- final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
- final StreamingConfig streamingConfig = streamingManager.getConfig(streaming);
- if (streamingConfig == null) {
- throw new IllegalArgumentException("Table " + streaming + " is not a streaming table.");
- }
- if (StreamingConfig.STREAMING_TYPE_KAFKA.equals(streamingConfig.getType())) {
- logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime));
-
- try {
- final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
- final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
- List<TblColRef> columns = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns();
-
- final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
- final ExecutorService executorService = Executors.newCachedThreadPool();
- final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
- for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
-
- final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
- for (int i = 0; i < partitionCount; ++i) {
- final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser);
- final Future<List<StreamingMessage>> future = executorService.submit(producer);
- futures.add(future);
- }
- }
- List<StreamingMessage> messages = Lists.newLinkedList();
- for (Future<List<StreamingMessage>> future : futures) {
- try {
- messages.addAll(future.get());
- } catch (InterruptedException e) {
- logger.warn("this thread should not be interrupted, just ignore", e);
- continue;
- } catch (ExecutionException e) {
- throw new RuntimeException("error when get StreamingMessages", e.getCause());
- }
- }
- final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
- logger.info("finish to get streaming batch, total message count:" + messages.size());
- return new StreamingBatch(messages, timeRange);
- } catch (ReflectiveOperationException e) {
- throw new RuntimeException("failed to create instance of StreamingParser", e);
- }
- } else {
- throw new IllegalArgumentException("kafka is the only supported streaming type.");
- }
- }
-
- private static class StreamingMessageProducer implements Callable<List<StreamingMessage>> {
-
- private final KafkaClusterConfig kafkaClusterConfig;
- private final int partitionId;
- private final StreamingParser streamingParser;
- private final Pair<Long, Long> timeRange;
- private final long margin;
-
- private List<Broker> replicaBrokers;
-
- StreamingMessageProducer(KafkaClusterConfig kafkaClusterConfig, int partitionId, Pair<Long, Long> timeRange, long margin, StreamingParser streamingParser) {
- this.kafkaClusterConfig = kafkaClusterConfig;
- this.partitionId = partitionId;
- this.streamingParser = streamingParser;
- this.margin = margin;
- this.timeRange = timeRange;
- this.replicaBrokers = kafkaClusterConfig.getBrokers();
- }
-
- private Broker getLeadBroker() {
- final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, replicaBrokers, kafkaClusterConfig);
- if (partitionMetadata != null) {
- if (partitionMetadata.errorCode() != 0) {
- logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
- }
- replicaBrokers = Lists.transform(partitionMetadata.replicas(), new Function<BrokerEndPoint, Broker>() {
- @Nullable
- @Override
- public Broker apply(@Nullable BrokerEndPoint brokerEndPoint) {
- return new Broker(brokerEndPoint, SecurityProtocol.PLAINTEXT);
- }
- });
- BrokerEndPoint leaderEndpoint = partitionMetadata.leader();
-
- return new Broker(leaderEndpoint, SecurityProtocol.PLAINTEXT);
- } else {
- return null;
- }
- }
-
- @Override
- public List<StreamingMessage> call() throws Exception {
- List<StreamingMessage> result = Lists.newLinkedList();
- try {
- long startTimestamp = timeRange.getFirst() - margin;
- long offset = KafkaUtils.findClosestOffsetWithDataTimestamp(kafkaClusterConfig, partitionId, startTimestamp, streamingParser);
- int fetchRound = 0;
- int consumeMsgCount = 0;
- Broker leadBroker = null;
- String topic = kafkaClusterConfig.getTopic();
- while (true) {
- boolean outOfMargin = false;
- int consumeMsgCountAtBeginning = consumeMsgCount;
- fetchRound++;
-
- if (leadBroker == null) {
- leadBroker = getLeadBroker();
- }
-
- if (leadBroker == null) {
- logger.warn("cannot find lead broker, wait 5s");
- Thread.sleep(5000);
- continue;
- }
-
- logger.info("fetching topic {} partition id {} offset {} leader {}", topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString());
-
- final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
- if (fetchResponse.errorCode(topic, partitionId) != 0) {
- logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
- Thread.sleep(30000);
- continue;
- }
-
- for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
- offset++;
- consumeMsgCount++;
- final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
- streamingMessage.setOffset(messageAndOffset.offset());
- if (streamingParser.filter(streamingMessage)) {
- final long timestamp = streamingMessage.getTimestamp();
- if (timestamp >= timeRange.getFirst() && timestamp < timeRange.getSecond()) {
- result.add(streamingMessage);
- } else if (timestamp < timeRange.getSecond() + margin) {
- //do nothing
- } else {
- logger.info("thread:" + Thread.currentThread() + " message timestamp:" + timestamp + " is out of time range:" + timeRange + " margin:" + margin);
- outOfMargin = true;
- break;
- }
- }
- }
- logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound);
- if (outOfMargin) {
- break;
- }
- if (consumeMsgCount == consumeMsgCountAtBeginning) {//nothing this round
- logger.info("no message consumed this round, wait 30s");
- Thread.sleep(30000);
- }
- }
- } catch (InterruptedException e) {
- logger.warn("this thread should not be interrupted, just stop fetching", e);
- }
- return result;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index e4c702d..633a30c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.util.ByteBufferBackedInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
deleted file mode 100644
index b1b4011..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Maps;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.DaemonThreadFactory;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.StreamingParser;
-import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.util.KafkaRequester;
-import org.apache.kylin.source.kafka.util.KafkaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.message.MessageAndOffset;
-
-/**
- * Continuously run this as a daemon to discover how "disordered" the kafka queue is.
- * This daemon only store a digest so it should not be space-consuming
- */
-public class KafkaInputAnalyzer extends AbstractApplication {
-
- public class KafkaMessagePuller implements Runnable {
-
- private final String topic;
- private final int partitionId;
- private final KafkaClusterConfig streamingConfig;
- private final LinkedBlockingQueue<StreamingMessage> streamQueue;
- private final StreamingParser streamingParser;
- private final Broker leadBroker;
- private long offset;
-
- protected final Logger logger;
-
- public KafkaMessagePuller(int clusterID, String topic, int partitionId, long startOffset, Broker leadBroker, KafkaClusterConfig kafkaClusterConfig, StreamingParser streamingParser) {
- this.topic = topic;
- this.partitionId = partitionId;
- this.streamingConfig = kafkaClusterConfig;
- this.offset = startOffset;
- this.logger = LoggerFactory.getLogger(topic + "_cluster_" + clusterID + "_" + partitionId);
- this.streamQueue = new LinkedBlockingQueue<StreamingMessage>(10000);
- this.streamingParser = streamingParser;
- this.leadBroker = leadBroker;
- }
-
- public BlockingQueue<StreamingMessage> getStreamQueue() {
- return streamQueue;
- }
-
- @Override
- public void run() {
- try {
- int consumeMsgCount = 0;
- int fetchRound = 0;
- while (true) {
- int consumeMsgCountAtBeginning = consumeMsgCount;
- fetchRound++;
-
- logger.info("fetching topic {} partition id {} offset {} leader {}", topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString());
-
- final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, streamingConfig);
- if (fetchResponse.errorCode(topic, partitionId) != 0) {
- logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
- Thread.sleep(30000);
- continue;
- }
-
- for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
- offset++;
- consumeMsgCount++;
-
- final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
- streamingMessage.setOffset(messageAndOffset.offset());
- if (streamingParser.filter(streamingMessage)) {
- streamQueue.add(streamingMessage);
- }
-
- }
- logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound);
-
- if (consumeMsgCount == consumeMsgCountAtBeginning) {//nothing this round
- Thread.sleep(30000);
- }
- }
- } catch (Exception e) {
- logger.error("consumer has encountered an error", e);
- }
- }
-
- }
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_STREAMING = OptionBuilder.withArgName("streaming").hasArg().isRequired(true).withDescription("Name of the streaming").create("streaming");
- @SuppressWarnings("static-access")
- private static final Option OPTION_TASK = OptionBuilder.withArgName("task").hasArg().isRequired(true).withDescription("get delay or get disorder degree").create("task");
- @SuppressWarnings("static-access")
- private static final Option OPTION_TSCOLNAME = OptionBuilder.withArgName("tsColName").hasArg().isRequired(true).withDescription("field name of the ts").create("tsColName");
-
- private static final Logger logger = LoggerFactory.getLogger(KafkaInputAnalyzer.class);
-
- private StreamingParser parser;
- private KafkaConfig kafkaConfig;
-
- private Options options;
-
- public KafkaInputAnalyzer() {
- options = new Options();
- options.addOption(OPTION_STREAMING);
- options.addOption(OPTION_TASK);
- options.addOption(OPTION_TSCOLNAME);
-
- }
-
- private List<BlockingQueue<StreamingMessage>> consume(final int clusterID, final KafkaClusterConfig kafkaClusterConfig, final int partitionCount, long whichtime) {
- List<BlockingQueue<StreamingMessage>> result = Lists.newArrayList();
- for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
- final kafka.cluster.Broker leadBroker = KafkaUtils.getLeadBroker(kafkaClusterConfig, partitionId);
- long streamingOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, whichtime, leadBroker, kafkaClusterConfig);
- logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId);
- KafkaMessagePuller consumer = new KafkaMessagePuller(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, leadBroker, kafkaClusterConfig, parser);
- Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(consumer);
- result.add(consumer.getStreamQueue());
- }
- return result;
- }
-
- private List<BlockingQueue<StreamingMessage>> consumeAll(long whichtime) {
- int clusterId = 0;
- final List<BlockingQueue<StreamingMessage>> queues = Lists.newLinkedList();
-
- for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
- final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
- final List<BlockingQueue<StreamingMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, whichtime);
- queues.addAll(oneClusterQueue);
- logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
- clusterId++;
- }
- return queues;
- }
-
- private void analyzeLatency() throws InterruptedException {
- long[] intervals = new long[] { 1, 5, 60, 300, 1800 };
- final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.LatestTime());
- final List<TimeHistogram> allHistograms = Lists.newArrayList();
- final TimeHistogram overallHistogram = new TimeHistogram(intervals, "overall");
-
- ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
- for (int i = 0; i < allPartitionData.size(); ++i) {
- final int index = i;
- allHistograms.add(new TimeHistogram(intervals, "" + i));
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- while (true) {
- try {
- StreamingMessage message = allPartitionData.get(index).take();
- long t = message.getTimestamp();
- allHistograms.get(index).processMillis(System.currentTimeMillis() - t);
- overallHistogram.processMillis(System.currentTimeMillis() - t);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
- }
-
- while (true) {
- System.out.println("Printing status at : " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Calendar.getInstance().getTime()));
-
- for (TimeHistogram histogram : allHistograms) {
- histogram.printStatus();
- }
- overallHistogram.printStatus();
- Thread.sleep(300000);
- }
- }
-
- private void analyzeDisorder() throws InterruptedException {
- final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.EarliestTime());
-
- final List<Long> wallClocks = Lists.newArrayList();
- final List<Long> wallOffset = Lists.newArrayList();
- final List<Long> maxDisorderTime = Lists.newArrayList();
- final List<Long> maxDisorderOffset = Lists.newArrayList();
- final List<Long> processedMessages = Lists.newArrayList();
-
- for (int i = 0; i < allPartitionData.size(); i++) {
- wallClocks.add(0L);
- wallOffset.add(0L);
- maxDisorderTime.add(0L);
- maxDisorderOffset.add(0L);
- processedMessages.add(0L);
- }
-
- ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
- final CountDownLatch countDownLatch = new CountDownLatch(allPartitionData.size());
- for (int i = 0; i < allPartitionData.size(); ++i) {
- final int index = i;
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- while (true) {
- StreamingMessage message = allPartitionData.get(index).poll(60, TimeUnit.SECONDS);
- if (message == null) {
- System.out.println(String.format("Thread %d is exiting", index));
- break;
- }
- long t = message.getTimestamp();
- long offset = message.getOffset();
- if (t < wallClocks.get(index)) {
- maxDisorderTime.set(index, Math.max(wallClocks.get(index) - t, maxDisorderTime.get(index)));
- maxDisorderOffset.set(index, Math.max(offset - wallOffset.get(index), maxDisorderOffset.get(index)));
- } else {
- wallClocks.set(index, t);
- wallOffset.set(index, offset);
- }
- processedMessages.set(index, processedMessages.get(index) + 1);
-
- if (processedMessages.get(index) % 10000 == 1) {
- System.out.println(String.format("Thread %d processed %d messages. Max disorder time is %d , max disorder offset is %d", //
- index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
- }
- }
-
- System.out.println(String.format("Thread %d finishes after %d messages. Max disorder time is %d , max disorder offset is %d", //
- index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
- countDownLatch.countDown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- countDownLatch.await();
- }
-
- @Override
- protected Options getOptions() {
- return options;
- }
-
- @Override
- protected void execute(OptionsHelper optionsHelper) throws Exception {
-
- String streaming = optionsHelper.getOptionValue(OPTION_STREAMING);
- String task = optionsHelper.getOptionValue(OPTION_TASK);
- String tsColName = optionsHelper.getOptionValue(OPTION_TSCOLNAME);
-
- Map<String, String> properties = Maps.newHashMap();
- properties.put(StreamingParser.PROPERTY_TS_COLUMN_NAME, tsColName);
- kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(streaming);
- parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), properties);
-
- if ("disorder".equalsIgnoreCase(task)) {
- analyzeDisorder();
- } else if ("delay".equalsIgnoreCase(task)) {
- analyzeLatency();
- } else {
- optionsHelper.printUsage(this.getClass().getName(), options);
- }
- }
-
- public static void main(String[] args) {
- KafkaInputAnalyzer analyzer = new KafkaInputAnalyzer();
- analyzer.execute(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
deleted file mode 100644
index 6a456bc..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-
-/**
- * only for verify kylin streaming's correctness by comparing to data in original kafka topic
- */
-public class KafkaVerify {
-
- public static void main(String[] args) throws IOException {
-
- System.out.println("start");
-
- ObjectMapper mapper = new ObjectMapper();
- JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
-
- long start = Long.valueOf(args[0]);
- long end = Long.valueOf(args[1]);
- long interval = Long.valueOf(args[2]);
- int bucket = (int) ((end - start + interval - 1) / interval);
-
- long[] qtySum = new long[bucket];
- long qtyTotal = 0;
- long[] counts = new long[bucket];
- long countTotal = 0;
- long processed = 0;
- long minOffset = -1;
- long maxOffset = -1;
-
- try (BufferedReader br = new BufferedReader(new FileReader(new File(args[3])))) {
- String s;
- while ((s = br.readLine()) != null) {
- // process the line.
- if (++processed % 10000 == 1) {
- System.out.println("processing " + processed);
- }
-
- Map<String, String> root = mapper.readValue(s, mapType);
- String tsStr = root.get("sys_ts");
-
- if (StringUtils.isEmpty(tsStr)) {
- continue;
- }
- long ts = Long.valueOf(tsStr);
- if (ts < start || ts >= end) {
- continue;
- }
-
- if (minOffset == -1) {
- minOffset = processed - 1;
- }
- maxOffset = processed - 1;
-
- long qty = Long.valueOf(root.get("qty"));
- int index = (int) ((ts - start) / interval);
- qtySum[index] += qty;
- qtyTotal += qty;
- counts[index]++;
- countTotal++;
- }
- }
-
- System.out.println("qty sum is " + Arrays.toString(qtySum));
- System.out.println("qty total is " + qtyTotal);
- System.out.println("count is " + Arrays.toString(counts));
- System.out.println("count total is " + countTotal);
- System.out.println("first processed is " + minOffset);
- System.out.println("last processed is " + maxOffset);
- }
-}
[5/6] kylin git commit: KYLIN-2072 further cleanup old streaming code
Posted by sh...@apache.org.
KYLIN-2072 further cleanup old streaming code
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ed643e6b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ed643e6b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ed643e6b
Branch: refs/heads/master
Commit: ed643e6b20e31a4c3a45d72dc8e5ff1287584764
Parents: c67fa74
Author: shaofengshi <sh...@apache.org>
Authored: Sun Oct 9 22:16:38 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 10 13:32:44 2016 +0800
----------------------------------------------------------------------
.../kylin/common/util/StreamingBatch.java | 44 --------------------
.../kylin/common/util/StreamingMessage.java | 3 --
.../org/apache/kylin/engine/EngineFactory.java | 14 +------
.../kylin/engine/IStreamingCubingEngine.java | 26 ------------
4 files changed, 1 insertion(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ed643e6b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
deleted file mode 100644
index e000aa6..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import java.util.List;
-
-/**
- */
-public final class StreamingBatch {
-
- private final List<StreamingMessage> messages;
-
- private final Pair<Long, Long> timeRange;
-
- public StreamingBatch(List<StreamingMessage> messages, Pair<Long, Long> timeRange) {
- this.messages = messages;
- this.timeRange = timeRange;
- }
-
- public List<StreamingMessage> getMessages() {
- return messages;
- }
-
- public Pair<Long, Long> getTimeRange() {
- return timeRange;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ed643e6b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
index 53ab195..981c8a8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
@@ -18,7 +18,6 @@
package org.apache.kylin.common.util;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -34,8 +33,6 @@ public class StreamingMessage {
private Map<String, Object> params;
- public static final StreamingMessage EOF = new StreamingMessage(Collections.<String> emptyList(), 0L, 0L, Collections.<String, Object> emptyMap());
-
public StreamingMessage(List<String> data, long offset, long timestamp, Map<String, Object> params) {
this.data = data;
this.offset = offset;
http://git-wip-us.apache.org/repos/asf/kylin/blob/ed643e6b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 7044a3e..acaa7da 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -31,23 +31,15 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
public class EngineFactory {
private static ImplementationSwitch<IBatchCubingEngine> batchEngines;
- private static ImplementationSwitch<IStreamingCubingEngine> streamingEngines;
static {
Map<Integer, String> impls = KylinConfig.getInstanceFromEnv().getJobEngines();
- batchEngines = new ImplementationSwitch<IBatchCubingEngine>(impls, IBatchCubingEngine.class);
-
- impls.clear();
- streamingEngines = new ImplementationSwitch<IStreamingCubingEngine>(impls, IStreamingCubingEngine.class); // TODO
+ batchEngines = new ImplementationSwitch<>(impls, IBatchCubingEngine.class);
}
public static IBatchCubingEngine batchEngine(IEngineAware aware) {
return batchEngines.get(aware.getEngineType());
}
- public static IStreamingCubingEngine streamingEngine(IEngineAware aware) {
- return streamingEngines.get(aware.getEngineType());
- }
-
/** Mark deprecated to indicate for test purpose only */
@Deprecated
public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
@@ -68,8 +60,4 @@ public class EngineFactory {
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter);
}
- public static Runnable createStreamingCubingBuilder(CubeSegment seg) {
- return streamingEngine(seg).createStreamingCubingBuilder(seg);
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ed643e6b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
deleted file mode 100644
index cec57a7..0000000
--- a/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine;
-
-import org.apache.kylin.cube.CubeSegment;
-
-public interface IStreamingCubingEngine {
-
- public Runnable createStreamingCubingBuilder(CubeSegment seg);
-}
[4/6] kylin git commit: KYLIN-2072 Cleanup old streaming code
Posted by sh...@apache.org.
KYLIN-2072 Cleanup old streaming code
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5aee0226
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5aee0226
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5aee0226
Branch: refs/heads/master
Commit: 5aee022612c6fa40c41e8c00063714b79b6d5237
Parents: cb2b12b
Author: shaofengshi <sh...@apache.org>
Authored: Sun Oct 9 13:10:50 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 10 13:32:44 2016 +0800
----------------------------------------------------------------------
assembly/pom.xml | 4 -
.../kylin/job/streaming/KafkaDataLoader.java | 79 ----
build/bin/cleanup_streaming_files.sh | 42 --
build/bin/kylin.sh | 61 ---
build/bin/streaming_build.sh | 33 --
build/bin/streaming_check.sh | 29 --
build/bin/streaming_fillgap.sh | 26 --
build/bin/streaming_rolllog.sh | 29 --
.../metadata/streaming/StreamingConfig.java | 85 ++++
.../metadata/streaming/StreamingManager.java | 248 ++++++++++++
.../.settings/org.eclipse.core.resources.prefs | 6 -
.../.settings/org.eclipse.jdt.core.prefs | 386 -------------------
.../.settings/org.eclipse.jdt.ui.prefs | 7 -
engine-streaming/pom.xml | 121 ------
.../kylin/engine/streaming/BootstrapConfig.java | 71 ----
.../kylin/engine/streaming/IStreamingInput.java | 30 --
.../engine/streaming/IStreamingOutput.java | 34 --
.../streaming/OneOffStreamingBuilder.java | 71 ----
.../engine/streaming/StreamingBatchBuilder.java | 43 ---
.../kylin/engine/streaming/StreamingConfig.java | 85 ----
.../engine/streaming/StreamingManager.java | 248 ------------
.../kylin/engine/streaming/cli/MonitorCLI.java | 88 -----
.../engine/streaming/cli/StreamingCLI.java | 114 ------
.../streaming/cube/StreamingCubeBuilder.java | 168 --------
.../diagnose/StreamingLogAnalyzer.java | 96 -----
.../streaming/monitor/StreamingMonitor.java | 172 ---------
.../engine/streaming/util/StreamingUtils.java | 51 ---
.../kylin/provision/BuildCubeWithStream.java | 4 +-
pom.xml | 6 -
.../rest/controller/StreamingController.java | 2 +-
.../kylin/rest/controller/TableController.java | 2 +-
.../apache/kylin/rest/service/BasicService.java | 2 +-
.../kylin/rest/service/StreamingService.java | 2 +-
source-kafka/pom.xml | 6 -
.../kafka/ByteBufferBackedInputStream.java | 52 ---
.../apache/kylin/source/kafka/KafkaSource.java | 2 +-
.../kylin/source/kafka/KafkaStreamingInput.java | 227 -----------
.../source/kafka/TimedJsonStreamParser.java | 1 +
.../kafka/diagnose/KafkaInputAnalyzer.java | 312 ---------------
.../source/kafka/diagnose/KafkaVerify.java | 101 -----
.../source/kafka/diagnose/TimeHistogram.java | 85 ----
.../kafka/util/ByteBufferBackedInputStream.java | 52 +++
.../kylin/source/kafka/util/KafkaRequester.java | 191 ---------
.../kylin/source/kafka/util/KafkaUtils.java | 173 ---------
.../test/java/TimedJsonStreamParserTest.java | 4 +-
storage-hbase/pom.xml | 4 -
.../hbase/steps/HBaseStreamingOutput.java | 98 -----
.../apache/kylin/tool/CubeMetaExtractor.java | 4 +-
48 files changed, 397 insertions(+), 3360 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 0c80afc..e6f83a8 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -47,10 +47,6 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-engine-mr</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-engine-streaming</artifactId>
- </dependency>
<!-- Env & Test -->
<dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
deleted file mode 100644
index 454f6cf..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.streaming;
-
-import java.util.List;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-/**
- * Load prepared data into kafka(for test use)
- */
-public class KafkaDataLoader extends StreamDataLoader {
- List<KafkaClusterConfig> kafkaClusterConfigs;
-
- public KafkaDataLoader(KafkaConfig kafkaConfig) {
- super(kafkaConfig);
- this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs();
- }
-
- public void loadIntoKafka(List<String> messages) {
-
- KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
- String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
- @Nullable
- @Override
- public String apply(BrokerConfig brokerConfig) {
- return brokerConfig.getHost() + ":" + brokerConfig.getPort();
- }
- }), ",");
- Properties props = new Properties();
- props.put("metadata.broker.list", brokerList);
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("request.required.acks", "1");
- props.put("retry.backoff.ms", "1000");
-
- ProducerConfig config = new ProducerConfig(props);
-
- Producer<String, String> producer = new Producer<String, String>(config);
-
- List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
- for (int i = 0; i < messages.size(); ++i) {
- KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
- keyedMessages.add(keyedMessage);
- }
- producer.send(keyedMessages);
- producer.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/cleanup_streaming_files.sh
----------------------------------------------------------------------
diff --git a/build/bin/cleanup_streaming_files.sh b/build/bin/cleanup_streaming_files.sh
deleted file mode 100644
index 9b31a4f..0000000
--- a/build/bin/cleanup_streaming_files.sh
+++ /dev/null
@@ -1,42 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-if [ $# != 1 ]
-then
- echo 'invalid input'
- exit -1
-fi
-
-cd $KYLIN_HOME/logs
-
-for pidfile in `find -L . -name "$1_1*"`
-do
- pidfile=`echo "$pidfile" | cut -c 3-`
- echo "pidfile:$pidfile"
- pid=`cat $pidfile`
- if [ `ps -ef | awk '{print $2}' | grep -w $pid | wc -l` = 1 ]
- then
- echo "pid:$pid still running"
- else
- echo "pid:$pid not running, try to delete files"
- echo $pidfile | xargs rm
- echo "streaming_$pidfile.log" | xargs rm
- fi
-done
-
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index e767492..039be9f 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -139,67 +139,6 @@ then
exit 1
fi
-# streaming command
-elif [ "$1" == "streaming" ]
-then
- if [ $# -lt 4 ]
- then
- echo "invalid input args $@"
- exit -1
- fi
- if [ "$2" == "start" ]
- then
- retrieveDependency
- source ${dir}/find-kafka-dependency.sh
-
- # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
- hbase ${KYLIN_EXTRA_START_OPTS} \
- -Dlog4j.configuration=kylin-log4j.properties\
- -Dkylin.hive.dependency=${hive_dependency} \
- -Dkylin.kafka.dependency=${kafka_dependency} \
- -Dkylin.hbase.dependency=${hbase_dependency} \
- org.apache.kylin.engine.streaming.cli.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
- echo "streaming started name: $3 id: $4"
- exit 0
- elif [ "$2" == "stop" ]
- then
- if [ ! -f "${KYLIN_HOME}/$3_$4" ]
- then
- echo "streaming is not running, please check"
- exit 1
- fi
- pid=`cat ${KYLIN_HOME}/$3_$4`
- if [ "$pid" = "" ]
- then
- echo "streaming is not running, please check"
- exit 1
- else
- echo "stopping streaming:$pid"
- kill $pid
- fi
- rm ${KYLIN_HOME}/$3_$4
- exit 0
- else
- echo
- fi
-
-# monitor command
-elif [ "$1" == "monitor" ]
-then
- echo "monitor job"
-
- retrieveDependency
- source ${dir}/find-kafka-dependency.sh
-
- # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
- hbase ${KYLIN_EXTRA_START_OPTS} \
- -Dlog4j.configuration=kylin-log4j.properties\
- -Dkylin.hive.dependency=${hive_dependency} \
- -Dkylin.kafka.dependency=${kafka_dependency} \
- -Dkylin.hbase.dependency=${hbase_dependency} \
- org.apache.kylin.engine.streaming.cli.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
- exit 0
-
elif [ "$1" = "version" ]
then
exec hbase -Dlog4j.configuration=kylin-log4j.properties org.apache.kylin.common.KylinVersion
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_build.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh
deleted file mode 100644
index ed19036..0000000
--- a/build/bin/streaming_build.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-CUBE=$1
-INTERVAL=$2
-DELAY=$3
-CURRENT_TIME_IN_SECOND=`date +%s`
-CURRENT_TIME=$((CURRENT_TIME_IN_SECOND * 1000))
-START=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY))
-END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL))
-
-ID="$START"_"$END"
-echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${CUBE} ${ID} -start ${START} -end ${END} -cube ${CUBE}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_check.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_check.sh b/build/bin/streaming_check.sh
deleted file mode 100644
index fef0139..0000000
--- a/build/bin/streaming_check.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-receivers=$1
-host=$2
-tablename=$3
-authorization=$4
-projectname=$5
-cubename=$6
-sh ${KYLIN_HOME}/bin/kylin.sh monitor -receivers ${receivers} -host ${host} -tableName ${tablename} -authorization ${authorization} -cubeName ${cubename} -projectName ${projectname}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_fillgap.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh
deleted file mode 100644
index c67809a..0000000
--- a/build/bin/streaming_fillgap.sh
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-cube=$1
-
-cd ${KYLIN_HOME}
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${cube} fillgap -cube ${cube} -fillGap true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_rolllog.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_rolllog.sh b/build/bin/streaming_rolllog.sh
deleted file mode 100644
index 8018eb8..0000000
--- a/build/bin/streaming_rolllog.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-KYLIN_LOG_HOME=${KYLIN_HOME}/logs
-cd ${KYLIN_LOG_HOME}
-timestamp=`date +%Y_%m_%d_%H_%M_%S`
-tarfile=logs_archived_at_${timestamp}.tar
-files=`find -L . ! -name '*.tar' -type f -mtime +1` # keep two days' log
-echo ${files} | xargs tar -cvf ${tarfile}
-echo ${files} | xargs rm
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
new file mode 100644
index 0000000..9fd6ede
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.streaming;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class StreamingConfig extends RootPersistentEntity {
+
+ public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+ public static final String STREAMING_TYPE_KAFKA = "kafka";
+
+ @JsonProperty("name")
+ private String name;
+
+ @JsonProperty("type")
+ private String type = STREAMING_TYPE_KAFKA;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getResourcePath() {
+ return concatResourcePath(name);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public static String concatResourcePath(String name) {
+ return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
+ }
+
+ @Override
+ public StreamingConfig clone() {
+ try {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ SERIALIZER.serialize(this, new DataOutputStream(baos));
+ return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
+ } catch (IOException e) {
+ throw new RuntimeException(e);//in mem, should not happen
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
new file mode 100644
index 0000000..8cfe87d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class StreamingManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class);
+
+ // static cached instances
+ private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
+
+ public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+ private KylinConfig config;
+
+ // name ==> StreamingConfig
+ private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
+
+ public static void clearCache() {
+ CACHE.clear();
+ }
+
+ private StreamingManager(KylinConfig config) throws IOException {
+ this.config = config;
+ this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming");
+
+ // touch lower level metadata before registering my listener
+ reloadAllStreaming();
+ Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming");
+ }
+
+ private class StreamingSyncListener extends Broadcaster.Listener {
+ @Override
+ public void onClearAll(Broadcaster broadcaster) throws IOException {
+ clearCache();
+ }
+
+ @Override
+ public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+ if (event == Event.DROP)
+ removeStreamingLocal(cacheKey);
+ else
+ reloadStreamingConfigLocal(cacheKey);
+ }
+ }
+
+ private ResourceStore getStore() {
+ return ResourceStore.getStore(this.config);
+ }
+
+ public static StreamingManager getInstance(KylinConfig config) {
+ StreamingManager r = CACHE.get(config);
+ if (r != null) {
+ return r;
+ }
+
+ synchronized (StreamingManager.class) {
+ r = CACHE.get(config);
+ if (r != null) {
+ return r;
+ }
+ try {
+ r = new StreamingManager(config);
+ CACHE.put(config, r);
+ if (CACHE.size() > 1) {
+ logger.warn("More than one singleton exist");
+ }
+ return r;
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
+ }
+ }
+ }
+
+ private static String formatStreamingConfigPath(String name) {
+ return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
+ }
+
+ private static String formatStreamingOutputPath(String streaming, int partition) {
+ return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
+ }
+
+ private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
+ return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
+ }
+
+ public StreamingConfig getStreamingConfig(String name) {
+ return streamingMap.get(name);
+ }
+
+ public List<StreamingConfig> listAllStreaming() {
+ return new ArrayList<>(streamingMap.values());
+ }
+
+ /**
+ * Reload StreamingConfig from resource store It will be triggered by an desc
+ * update event.
+ *
+ * @param name
+ * @throws IOException
+ */
+ public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException {
+
+ // Save Source
+ String path = StreamingConfig.concatResourcePath(name);
+
+ // Reload the StreamingConfig
+ StreamingConfig ndesc = loadStreamingConfigAt(path);
+
+ // Here replace the old one
+ streamingMap.putLocal(ndesc.getName(), ndesc);
+ return ndesc;
+ }
+
+ // remove streamingConfig
+ public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+ String path = streamingConfig.getResourcePath();
+ getStore().deleteResource(path);
+ streamingMap.remove(streamingConfig.getName());
+ }
+
+ public StreamingConfig getConfig(String name) {
+ name = name.toUpperCase();
+ return streamingMap.get(name);
+ }
+
+ public void removeStreamingLocal(String streamingName) {
+ streamingMap.removeLocal(streamingName);
+ }
+
+ /**
+ * Update CubeDesc with the input. Broadcast the event into cluster
+ *
+ * @param desc
+ * @return
+ * @throws IOException
+ */
+ public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException {
+ // Validate CubeDesc
+ if (desc.getUuid() == null || desc.getName() == null) {
+ throw new IllegalArgumentException("SteamingConfig Illegal.");
+ }
+ String name = desc.getName();
+ if (!streamingMap.containsKey(name)) {
+ throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
+ }
+
+ // Save Source
+ String path = desc.getResourcePath();
+ getStore().putResource(path, desc, STREAMING_SERIALIZER);
+
+ // Reload the StreamingConfig
+ StreamingConfig ndesc = loadStreamingConfigAt(path);
+ // Here replace the old one
+ streamingMap.put(ndesc.getName(), desc);
+
+ return ndesc;
+ }
+
+ public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+ if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
+ throw new IllegalArgumentException();
+ }
+
+ if (streamingMap.containsKey(streamingConfig.getName()))
+ throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
+
+ String path = StreamingConfig.concatResourcePath(streamingConfig.getName());
+ getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
+ streamingMap.put(streamingConfig.getName(), streamingConfig);
+ return streamingConfig;
+ }
+
+ private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
+ ResourceStore store = getStore();
+ StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
+
+ if (StringUtils.isBlank(streamingDesc.getName())) {
+ throw new IllegalStateException("StreamingConfig name must not be blank");
+ }
+ return streamingDesc;
+ }
+
+ private void reloadAllStreaming() throws IOException {
+ ResourceStore store = getStore();
+ logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
+
+ streamingMap.clear();
+
+ List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+ for (String path : paths) {
+ StreamingConfig streamingConfig;
+ try {
+ streamingConfig = loadStreamingConfigAt(path);
+ } catch (Exception e) {
+ logger.error("Error loading streaming desc " + path, e);
+ continue;
+ }
+ if (path.equals(streamingConfig.getResourcePath()) == false) {
+ logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath());
+ continue;
+ }
+ if (streamingMap.containsKey(streamingConfig.getName())) {
+ logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path);
+ continue;
+ }
+
+ streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
+ }
+
+ logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.core.resources.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.core.resources.prefs b/engine-streaming/.settings/org.eclipse.core.resources.prefs
deleted file mode 100644
index 29abf99..0000000
--- a/engine-streaming/.settings/org.eclipse.core.resources.prefs
+++ /dev/null
@@ -1,6 +0,0 @@
-eclipse.preferences.version=1
-encoding//src/main/java=UTF-8
-encoding//src/main/resources=UTF-8
-encoding//src/test/java=UTF-8
-encoding//src/test/resources=UTF-8
-encoding/<project>=UTF-8
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.jdt.core.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.jdt.core.prefs b/engine-streaming/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index 5aaaf1e..0000000
--- a/engine-streaming/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,386 +0,0 @@
-eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.annotation.inheritNullAnnotations=disabled
-org.eclipse.jdt.core.compiler.annotation.missingNonNullByDefaultAnnotation=ignore
-org.eclipse.jdt.core.compiler.annotation.nonnull=org.eclipse.jdt.annotation.NonNull
-org.eclipse.jdt.core.compiler.annotation.nonnull.secondary=
-org.eclipse.jdt.core.compiler.annotation.nonnullbydefault=org.eclipse.jdt.annotation.NonNullByDefault
-org.eclipse.jdt.core.compiler.annotation.nonnullbydefault.secondary=
-org.eclipse.jdt.core.compiler.annotation.nullable=org.eclipse.jdt.annotation.Nullable
-org.eclipse.jdt.core.compiler.annotation.nullable.secondary=
-org.eclipse.jdt.core.compiler.annotation.nullanalysis=disabled
-org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
-org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
-org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
-org.eclipse.jdt.core.compiler.compliance=1.7
-org.eclipse.jdt.core.compiler.debug.lineNumber=generate
-org.eclipse.jdt.core.compiler.debug.localVariable=generate
-org.eclipse.jdt.core.compiler.debug.sourceFile=generate
-org.eclipse.jdt.core.compiler.problem.annotationSuperInterface=warning
-org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
-org.eclipse.jdt.core.compiler.problem.autoboxing=ignore
-org.eclipse.jdt.core.compiler.problem.comparingIdentical=warning
-org.eclipse.jdt.core.compiler.problem.deadCode=warning
-org.eclipse.jdt.core.compiler.problem.deprecation=warning
-org.eclipse.jdt.core.compiler.problem.deprecationInDeprecatedCode=disabled
-org.eclipse.jdt.core.compiler.problem.deprecationWhenOverridingDeprecatedMethod=disabled
-org.eclipse.jdt.core.compiler.problem.discouragedReference=ignore
-org.eclipse.jdt.core.compiler.problem.emptyStatement=ignore
-org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
-org.eclipse.jdt.core.compiler.problem.explicitlyClosedAutoCloseable=ignore
-org.eclipse.jdt.core.compiler.problem.fallthroughCase=ignore
-org.eclipse.jdt.core.compiler.problem.fatalOptionalError=disabled
-org.eclipse.jdt.core.compiler.problem.fieldHiding=ignore
-org.eclipse.jdt.core.compiler.problem.finalParameterBound=warning
-org.eclipse.jdt.core.compiler.problem.finallyBlockNotCompletingNormally=warning
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=ignore
-org.eclipse.jdt.core.compiler.problem.hiddenCatchBlock=warning
-org.eclipse.jdt.core.compiler.problem.includeNullInfoFromAsserts=disabled
-org.eclipse.jdt.core.compiler.problem.incompatibleNonInheritedInterfaceMethod=warning
-org.eclipse.jdt.core.compiler.problem.incompleteEnumSwitch=warning
-org.eclipse.jdt.core.compiler.problem.indirectStaticAccess=ignore
-org.eclipse.jdt.core.compiler.problem.localVariableHiding=ignore
-org.eclipse.jdt.core.compiler.problem.methodWithConstructorName=warning
-org.eclipse.jdt.core.compiler.problem.missingDefaultCase=ignore
-org.eclipse.jdt.core.compiler.problem.missingDeprecatedAnnotation=ignore
-org.eclipse.jdt.core.compiler.problem.missingEnumCaseDespiteDefault=disabled
-org.eclipse.jdt.core.compiler.problem.missingHashCodeMethod=ignore
-org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotation=ignore
-org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotationForInterfaceMethodImplementation=enabled
-org.eclipse.jdt.core.compiler.problem.missingSerialVersion=warning
-org.eclipse.jdt.core.compiler.problem.missingSynchronizedOnInheritedMethod=ignore
-org.eclipse.jdt.core.compiler.problem.noEffectAssignment=warning
-org.eclipse.jdt.core.compiler.problem.noImplicitStringConversion=warning
-org.eclipse.jdt.core.compiler.problem.nonExternalizedStringLiteral=ignore
-org.eclipse.jdt.core.compiler.problem.nonnullParameterAnnotationDropped=warning
-org.eclipse.jdt.core.compiler.problem.nonnullTypeVariableFromLegacyInvocation=warning
-org.eclipse.jdt.core.compiler.problem.nullAnnotationInferenceConflict=error
-org.eclipse.jdt.core.compiler.problem.nullReference=warning
-org.eclipse.jdt.core.compiler.problem.nullSpecViolation=error
-org.eclipse.jdt.core.compiler.problem.nullUncheckedConversion=warning
-org.eclipse.jdt.core.compiler.problem.overridingPackageDefaultMethod=warning
-org.eclipse.jdt.core.compiler.problem.parameterAssignment=ignore
-org.eclipse.jdt.core.compiler.problem.pessimisticNullAnalysisForFreeTypeVariables=warning
-org.eclipse.jdt.core.compiler.problem.possibleAccidentalBooleanAssignment=ignore
-org.eclipse.jdt.core.compiler.problem.potentialNullReference=ignore
-org.eclipse.jdt.core.compiler.problem.potentiallyUnclosedCloseable=ignore
-org.eclipse.jdt.core.compiler.problem.rawTypeReference=ignore
-org.eclipse.jdt.core.compiler.problem.redundantNullAnnotation=warning
-org.eclipse.jdt.core.compiler.problem.redundantNullCheck=ignore
-org.eclipse.jdt.core.compiler.problem.redundantSpecificationOfTypeArguments=ignore
-org.eclipse.jdt.core.compiler.problem.redundantSuperinterface=ignore
-org.eclipse.jdt.core.compiler.problem.reportMethodCanBePotentiallyStatic=ignore
-org.eclipse.jdt.core.compiler.problem.reportMethodCanBeStatic=ignore
-org.eclipse.jdt.core.compiler.problem.specialParameterHidingField=disabled
-org.eclipse.jdt.core.compiler.problem.staticAccessReceiver=warning
-org.eclipse.jdt.core.compiler.problem.suppressOptionalErrors=disabled
-org.eclipse.jdt.core.compiler.problem.suppressWarnings=enabled
-org.eclipse.jdt.core.compiler.problem.syntacticNullAnalysisForFields=disabled
-org.eclipse.jdt.core.compiler.problem.syntheticAccessEmulation=ignore
-org.eclipse.jdt.core.compiler.problem.typeParameterHiding=warning
-org.eclipse.jdt.core.compiler.problem.unavoidableGenericTypeProblems=enabled
-org.eclipse.jdt.core.compiler.problem.uncheckedTypeOperation=ignore
-org.eclipse.jdt.core.compiler.problem.unclosedCloseable=warning
-org.eclipse.jdt.core.compiler.problem.undocumentedEmptyBlock=ignore
-org.eclipse.jdt.core.compiler.problem.unhandledWarningToken=warning
-org.eclipse.jdt.core.compiler.problem.unnecessaryElse=ignore
-org.eclipse.jdt.core.compiler.problem.unnecessaryTypeCheck=ignore
-org.eclipse.jdt.core.compiler.problem.unqualifiedFieldAccess=ignore
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownException=ignore
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionExemptExceptionAndThrowable=enabled
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionIncludeDocCommentReference=enabled
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionWhenOverriding=disabled
-org.eclipse.jdt.core.compiler.problem.unusedExceptionParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedImport=warning
-org.eclipse.jdt.core.compiler.problem.unusedLabel=warning
-org.eclipse.jdt.core.compiler.problem.unusedLocal=warning
-org.eclipse.jdt.core.compiler.problem.unusedObjectAllocation=ignore
-org.eclipse.jdt.core.compiler.problem.unusedParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedParameterIncludeDocCommentReference=enabled
-org.eclipse.jdt.core.compiler.problem.unusedParameterWhenImplementingAbstract=disabled
-org.eclipse.jdt.core.compiler.problem.unusedParameterWhenOverridingConcrete=disabled
-org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=warning
-org.eclipse.jdt.core.compiler.problem.unusedTypeParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedWarningToken=warning
-org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=warning
-org.eclipse.jdt.core.compiler.source=1.7
-org.eclipse.jdt.core.formatter.align_type_members_on_columns=false
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation=0
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_assignment=0
-org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
-org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
-org.eclipse.jdt.core.formatter.alignment_for_enum_constants=0
-org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16
-org.eclipse.jdt.core.formatter.alignment_for_method_declaration=0
-org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_resources_in_try=80
-org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_union_type_in_multicatch=16
-org.eclipse.jdt.core.formatter.blank_lines_after_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_after_package=1
-org.eclipse.jdt.core.formatter.blank_lines_before_field=0
-org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration=0
-org.eclipse.jdt.core.formatter.blank_lines_before_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_before_member_type=1
-org.eclipse.jdt.core.formatter.blank_lines_before_method=1
-org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk=1
-org.eclipse.jdt.core.formatter.blank_lines_before_package=0
-org.eclipse.jdt.core.formatter.blank_lines_between_import_groups=1
-org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations=1
-org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_array_initializer=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block_in_case=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_constant=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_method_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false
-org.eclipse.jdt.core.formatter.comment.format_block_comments=false
-org.eclipse.jdt.core.formatter.comment.format_header=false
-org.eclipse.jdt.core.formatter.comment.format_html=true
-org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=false
-org.eclipse.jdt.core.formatter.comment.format_line_comments=false
-org.eclipse.jdt.core.formatter.comment.format_source_code=true
-org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true
-org.eclipse.jdt.core.formatter.comment.indent_root_tags=true
-org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
-org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=insert
-org.eclipse.jdt.core.formatter.comment.line_length=80
-org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries=true
-org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries=true
-org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments=false
-org.eclipse.jdt.core.formatter.compact_else_if=true
-org.eclipse.jdt.core.formatter.continuation_indentation=2
-org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2
-org.eclipse.jdt.core.formatter.disabling_tag=@formatter\:off
-org.eclipse.jdt.core.formatter.enabling_tag=@formatter\:on
-org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line=false
-org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header=true
-org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_empty_lines=false
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_block=true
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_body=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=false
-org.eclipse.jdt.core.formatter.indentation.size=4
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_label=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_ellipsis=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_try=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_try_resources=insert
-org.eclipse.jdt.core.formatter.insert_space_after_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_try=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_ellipsis=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=insert
-org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_try_resources=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.join_lines_in_comments=true
-org.eclipse.jdt.core.formatter.join_wrapped_lines=true
-org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.lineSplit=999
-org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
-org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1
-org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=true
-org.eclipse.jdt.core.formatter.tabulation.char=space
-org.eclipse.jdt.core.formatter.tabulation.size=4
-org.eclipse.jdt.core.formatter.use_on_off_tags=false
-org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
-org.eclipse.jdt.core.formatter.wrap_before_binary_operator=true
-org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch=true
-org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested=true
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.jdt.ui.prefs b/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
deleted file mode 100644
index d521bab..0000000
--- a/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
+++ /dev/null
@@ -1,7 +0,0 @@
-eclipse.preferences.version=1
-formatter_profile=_Space Indent & Long Lines
-formatter_settings_version=12
-org.eclipse.jdt.ui.ignorelowercasenames=true
-org.eclipse.jdt.ui.importorder=java;javax;org;com;
-org.eclipse.jdt.ui.ondemandthreshold=99
-org.eclipse.jdt.ui.staticondemandthreshold=99
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/engine-streaming/pom.xml b/engine-streaming/pom.xml
deleted file mode 100644
index 876279d..0000000
--- a/engine-streaming/pom.xml
+++ /dev/null
@@ -1,121 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>kylin-engine-streaming</artifactId>
- <packaging>jar</packaging>
- <name>Apache Kylin - Streaming Engine</name>
-
- <parent>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin</artifactId>
- <version>1.6.0-SNAPSHOT</version>
-
- </parent>
-
- <properties>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-cube</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-storage</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-job</artifactId>
- </dependency>
-
- <!-- Env & Test -->
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-common</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>provided</scope>
- <!-- protobuf version conflict with hbase -->
- <exclusions>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-app</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.mrunit</groupId>
- <artifactId>mrunit</artifactId>
- <classifier>hadoop2</classifier>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
-
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
deleted file mode 100644
index 35bdfa8..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming;
-
-/**
- */
-public class BootstrapConfig {
-
- private String cubeName;
- private long start = 0L;
- private long end = 0L;
-
- private boolean fillGap;
- private long maxFillGapRange = 4 * 3600 * 1000L;
-
- public long getStart() {
- return start;
- }
-
- public void setStart(long start) {
- this.start = start;
- }
-
- public long getEnd() {
- return end;
- }
-
- public void setEnd(long end) {
- this.end = end;
- }
-
- public String getCubeName() {
- return cubeName;
- }
-
- public void setCubeName(String cubeName) {
- this.cubeName = cubeName;
- }
-
- public boolean isFillGap() {
- return fillGap;
- }
-
- public void setFillGap(boolean fillGap) {
- this.fillGap = fillGap;
- }
-
- public long getMaxFillGapRange() {
- return maxFillGapRange;
- }
-
- public void setMaxFillGapRange(long maxFillGapRange) {
- this.maxFillGapRange = maxFillGapRange;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
deleted file mode 100644
index c583283..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-/**
- */
-public interface IStreamingInput {
-
- StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime);
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
deleted file mode 100644
index cb15e2b..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-
-/**
- */
-public interface IStreamingOutput {
-
- ICuboidWriter getCuboidWriter(IBuildable buildable);
-
- void output(IBuildable buildable, Map<Long, HyperLogLogPlusCounter> samplingResult);
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
deleted file mode 100644
index c9da46e..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.engine.streaming.util.StreamingUtils;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public class OneOffStreamingBuilder {
-
- private final IStreamingInput streamingInput;
- private final IStreamingOutput streamingOutput;
- private final StreamingBatchBuilder streamingBatchBuilder;
- private final long startTime;
- private final long endTime;
- private final RealizationType realizationType;
- private final String realizationName;
-
- public OneOffStreamingBuilder(RealizationType realizationType, String realizationName, long startTime, long endTime) {
- Preconditions.checkArgument(startTime < endTime);
- this.startTime = startTime;
- this.endTime = endTime;
- this.realizationType = Preconditions.checkNotNull(realizationType);
- this.realizationName = Preconditions.checkNotNull(realizationName);
- this.streamingInput = Preconditions.checkNotNull(StreamingUtils.getStreamingInput());
- this.streamingOutput = Preconditions.checkNotNull(StreamingUtils.getStreamingOutput());
- this.streamingBatchBuilder = Preconditions.checkNotNull(StreamingUtils.getMicroBatchBuilder(realizationType, realizationName));
- }
-
- public Runnable build() {
- return new Runnable() {
- @Override
- public void run() {
- StreamingBatch streamingBatch = streamingInput.getBatchWithTimeWindow(realizationType, realizationName, -1, startTime, endTime);
- final IBuildable buildable = streamingBatchBuilder.createBuildable(streamingBatch);
- final Map<Long, HyperLogLogPlusCounter> samplingResult = streamingBatchBuilder.sampling(streamingBatch);
- final Map<TblColRef, Dictionary<String>> dictionaryMap = streamingBatchBuilder.buildDictionary(streamingBatch, buildable);
- streamingBatchBuilder.build(streamingBatch, dictionaryMap, streamingOutput.getCuboidWriter(buildable));
- streamingOutput.output(buildable, samplingResult);
- streamingBatchBuilder.commit(buildable);
- }
- };
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
deleted file mode 100644
index 8b0b8e6..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- */
-public interface StreamingBatchBuilder {
-
- IBuildable createBuildable(StreamingBatch streamingBatch);
-
- Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch);
-
- Map<TblColRef, Dictionary<String>> buildDictionary(StreamingBatch streamingBatch, IBuildable buildable);
-
- void build(StreamingBatch streamingBatch, Map<TblColRef, Dictionary<String>> dictionaryMap, ICuboidWriter cuboidWriter);
-
- void commit(IBuildable buildable);
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
deleted file mode 100644
index 9d1a0b1..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.persistence.Serializer;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class StreamingConfig extends RootPersistentEntity {
-
- public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
-
- public static final String STREAMING_TYPE_KAFKA = "kafka";
-
- @JsonProperty("name")
- private String name;
-
- @JsonProperty("type")
- private String type = STREAMING_TYPE_KAFKA;
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getResourcePath() {
- return concatResourcePath(name);
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public static String concatResourcePath(String name) {
- return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
- }
-
- @Override
- public StreamingConfig clone() {
- try {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- SERIALIZER.serialize(this, new DataOutputStream(baos));
- return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
- } catch (IOException e) {
- throw new RuntimeException(e);//in mem, should not happen
- }
- }
-
-}
[6/6] kylin git commit: KYLIN-1726 package rename
Posted by sh...@apache.org.
KYLIN-1726 package rename
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c67fa740
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c67fa740
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c67fa740
Branch: refs/heads/master
Commit: c67fa740d364d372c7a6424fd160570cc7e890c4
Parents: 5aee022
Author: shaofengshi <sh...@apache.org>
Authored: Sun Oct 9 13:24:59 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 10 13:32:44 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/source/kafka/KafkaMRInput.java | 3 +
.../kylin/source/kafka/MergeOffsetStep.java | 80 -----------
.../kylin/source/kafka/SeekOffsetStep.java | 140 ------------------
.../source/kafka/StringStreamingParser.java | 51 -------
.../apache/kylin/source/kafka/TopicMeta.java | 46 ------
.../kylin/source/kafka/UpdateTimeRangeStep.java | 117 ---------------
.../kylin/source/kafka/job/MergeOffsetStep.java | 80 +++++++++++
.../kylin/source/kafka/job/SeekOffsetStep.java | 141 +++++++++++++++++++
.../source/kafka/job/UpdateTimeRangeStep.java | 117 +++++++++++++++
.../kafka/util/ByteBufferBackedInputStream.java | 6 +-
10 files changed, 344 insertions(+), 437 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 6358ee1..4d1f5c9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -43,6 +43,9 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.job.MergeOffsetStep;
+import org.apache.kylin.source.kafka.job.SeekOffsetStep;
+import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep;
import javax.annotation.Nullable;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
deleted file mode 100644
index 18c959a..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class MergeOffsetStep extends AbstractExecutable {
-
- private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class);
- public MergeOffsetStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
- final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-
- List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
-
- Collections.sort(mergingSegs);
-
- final CubeSegment first = mergingSegs.get(0);
- final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
-
- segment.setSourceOffsetStart(first.getSourceOffsetStart());
- segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
- segment.setSourceOffsetEnd(last.getSourceOffsetEnd());
- segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
-
- long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs);
- long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs);
-
- segment.setDateRangeStart(dateRangeStart);
- segment.setDateRangeEnd(dateRangeEnd);
-
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to update cube segment offset", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
deleted file mode 100644
index 151b912..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class SeekOffsetStep extends AbstractExecutable {
-
- private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class);
-
- public SeekOffsetStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
- final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
-
- Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart();
- Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd();
-
- if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
- }
-
- final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable());
- final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
- final String topic = kafakaConfig.getTopic();
- try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
- final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
-
- if (startOffsets.isEmpty()) {
- // user didn't specify start offset, use the biggest offset in existing segments as start
- for (CubeSegment seg : cube.getSegments()) {
- Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd();
- for (PartitionInfo partition : partitionInfos) {
- int partitionId = partition.partition();
- if (segEndOffset.containsKey(partitionId)) {
- startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId)));
- }
- }
- }
-
- if (partitionInfos.size() > startOffsets.size()) {
- // has new partition added
- for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
- long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
- startOffsets.put(partitionInfos.get(x).partition(), earliest);
- }
- }
-
- logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
- }
-
- if (endOffsets.isEmpty()) {
- // user didn't specify end offset, use latest offset in kafka
- for (PartitionInfo partitionInfo : partitionInfos) {
- long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
- endOffsets.put(partitionInfo.partition(), latest);
- }
-
- logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString());
- }
- }
-
- long totalStartOffset = 0, totalEndOffset = 0;
- for (Long v : startOffsets.values()) {
- totalStartOffset += v;
- }
- for (Long v : endOffsets.values()) {
- totalEndOffset += v;
- }
-
- if (totalEndOffset > totalStartOffset) {
- segment.setSourceOffsetStart(totalStartOffset);
- segment.setSourceOffsetEnd(totalEndOffset);
- segment.setSourcePartitionOffsetStart(startOffsets);
- segment.setSourcePartitionOffsetEnd(endOffsets);
- segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- } catch (IOException e) {
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset));
- } else {
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToRemoveSegs(segment);
- try {
- cubeManager.updateCube(cubeBuilder);
- } catch (IOException e) {
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
-
- return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes");
- }
-
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
deleted file mode 100644
index f74df83..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public final class StringStreamingParser extends StreamingParser {
-
- public static final StringStreamingParser instance = new StringStreamingParser(null, null);
-
- private StringStreamingParser(List<TblColRef> allColumns, Map<String, String> properties) {
- }
-
- @Override
- public StreamingMessage parse(ByteBuffer message) {
- byte[] bytes = new byte[message.limit()];
- message.get(bytes);
- return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), 0, 0, Collections.<String, Object> emptyMap());
- }
-
- @Override
- public boolean filter(StreamingMessage streamingMessage) {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
deleted file mode 100644
index a73543e..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer
- *
- */
-public class TopicMeta {
-
- private final String name;
-
- private final List<Integer> partitionIds;
-
- public TopicMeta(String name, List<Integer> partitionIds) {
- this.name = name;
- this.partitionIds = Collections.unmodifiableList(partitionIds);
- }
-
- public String getName() {
- return name;
- }
-
- public List<Integer> getPartitionIds() {
- return partitionIds;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
deleted file mode 100644
index 9e902d8..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class UpdateTimeRangeStep extends AbstractExecutable {
-
- private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class);
-
- public UpdateTimeRangeStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
- final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
- final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
- final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
- final Path outputFile = new Path(outputPath, partitionCol.getName());
-
- String minValue = null, maxValue = null, currentValue = null;
- FSDataInputStream inputStream = null;
- BufferedReader bufferedReader = null;
- try {
- FileSystem fs = HadoopUtil.getFileSystem(outputPath);
- inputStream = fs.open(outputFile);
- bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
- minValue = currentValue = bufferedReader.readLine();
- while (currentValue != null) {
- maxValue = currentValue;
- currentValue = bufferedReader.readLine();
- }
- } catch (IOException e) {
- logger.error("fail to read file " + outputFile, e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- } finally {
- IOUtils.closeQuietly(bufferedReader);
- IOUtils.closeQuietly(inputStream);
- }
-
- final DataType partitionColType = partitionCol.getType();
- FastDateFormat dateFormat;
- if (partitionColType.isDate()) {
- dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
- } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
- dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
- } else if (partitionColType.isStringFamily()) {
- String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
- if (StringUtils.isEmpty(partitionDateFormat)) {
- partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
- }
- dateFormat = DateFormat.getDateFormat(partitionDateFormat);
- } else {
- return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type");
- }
-
- try {
- long startTime = dateFormat.parse(minValue).getTime();
- long endTime = dateFormat.parse(maxValue).getTime();
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- segment.setDateRangeStart(startTime);
- segment.setDateRangeEnd(endTime);
- cubeBuilder.setToUpdateSegs(segment);
- cubeManager.updateCube(cubeBuilder);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (Exception e) {
- logger.error("fail to update cube segment offset", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
new file mode 100644
index 0000000..9cadd72
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka.job;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class MergeOffsetStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class);
+ public MergeOffsetStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+ List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
+
+ Collections.sort(mergingSegs);
+
+ final CubeSegment first = mergingSegs.get(0);
+ final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
+
+ segment.setSourceOffsetStart(first.getSourceOffsetStart());
+ segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+ segment.setSourceOffsetEnd(last.getSourceOffsetEnd());
+ segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
+
+ long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs);
+ long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs);
+
+ segment.setDateRangeStart(dateRangeStart);
+ segment.setDateRangeEnd(dateRangeEnd);
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(segment);
+ try {
+ cubeManager.updateCube(cubeBuilder);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (IOException e) {
+ logger.error("fail to update cube segment offset", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
new file mode 100644
index 0000000..5751095
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka.job;
+
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.util.KafkaClient;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class SeekOffsetStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class);
+
+ public SeekOffsetStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+ Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart();
+ Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd();
+
+ if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
+ }
+
+ final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable());
+ final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
+ final String topic = kafakaConfig.getTopic();
+ try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
+ final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+
+ if (startOffsets.isEmpty()) {
+ // user didn't specify start offset, use the biggest offset in existing segments as start
+ for (CubeSegment seg : cube.getSegments()) {
+ Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd();
+ for (PartitionInfo partition : partitionInfos) {
+ int partitionId = partition.partition();
+ if (segEndOffset.containsKey(partitionId)) {
+ startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId)));
+ }
+ }
+ }
+
+ if (partitionInfos.size() > startOffsets.size()) {
+ // has new partition added
+ for (int x = startOffsets.size(); x < partitionInfos.size(); x++) {
+ long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
+ startOffsets.put(partitionInfos.get(x).partition(), earliest);
+ }
+ }
+
+ logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString());
+ }
+
+ if (endOffsets.isEmpty()) {
+ // user didn't specify end offset, use latest offset in kafka
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
+ endOffsets.put(partitionInfo.partition(), latest);
+ }
+
+ logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString());
+ }
+ }
+
+ long totalStartOffset = 0, totalEndOffset = 0;
+ for (Long v : startOffsets.values()) {
+ totalStartOffset += v;
+ }
+ for (Long v : endOffsets.values()) {
+ totalEndOffset += v;
+ }
+
+ if (totalEndOffset > totalStartOffset) {
+ segment.setSourceOffsetStart(totalStartOffset);
+ segment.setSourceOffsetEnd(totalEndOffset);
+ segment.setSourcePartitionOffsetStart(startOffsets);
+ segment.setSourcePartitionOffsetEnd(endOffsets);
+ segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(segment);
+ try {
+ cubeManager.updateCube(cubeBuilder);
+ } catch (IOException e) {
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset));
+ } else {
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToRemoveSegs(segment);
+ try {
+ cubeManager.updateCube(cubeBuilder);
+ } catch (IOException e) {
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes");
+ }
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
new file mode 100644
index 0000000..d19aa63
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka.job;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class UpdateTimeRangeStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class);
+
+ public UpdateTimeRangeStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+ final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
+ final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+ final Path outputFile = new Path(outputPath, partitionCol.getName());
+
+ String minValue = null, maxValue = null, currentValue = null;
+ FSDataInputStream inputStream = null;
+ BufferedReader bufferedReader = null;
+ try {
+ FileSystem fs = HadoopUtil.getFileSystem(outputPath);
+ inputStream = fs.open(outputFile);
+ bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+ minValue = currentValue = bufferedReader.readLine();
+ while (currentValue != null) {
+ maxValue = currentValue;
+ currentValue = bufferedReader.readLine();
+ }
+ } catch (IOException e) {
+ logger.error("fail to read file " + outputFile, e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ } finally {
+ IOUtils.closeQuietly(bufferedReader);
+ IOUtils.closeQuietly(inputStream);
+ }
+
+ final DataType partitionColType = partitionCol.getType();
+ FastDateFormat dateFormat;
+ if (partitionColType.isDate()) {
+ dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
+ } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
+ dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+ } else if (partitionColType.isStringFamily()) {
+ String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
+ if (StringUtils.isEmpty(partitionDateFormat)) {
+ partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
+ }
+ dateFormat = DateFormat.getDateFormat(partitionDateFormat);
+ } else {
+ return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type");
+ }
+
+ try {
+ long startTime = dateFormat.parse(minValue).getTime();
+ long endTime = dateFormat.parse(maxValue).getTime();
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ segment.setDateRangeStart(startTime);
+ segment.setDateRangeEnd(endTime);
+ cubeBuilder.setToUpdateSegs(segment);
+ cubeManager.updateCube(cubeBuilder);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (Exception e) {
+ logger.error("fail to update cube segment offset", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
index 7a42598..894a144 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.source.kafka.util;
import java.io.IOException;