You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/19 03:01:50 UTC
[03/50] [abbrv] kylin git commit: KYLIN-2072 Cleanup old streaming
code
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
deleted file mode 100644
index 1c579c6..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class TimeHistogram {
- private long[] bucketsBoundary;
- private AtomicLong[] counters;
- private String id;
-
- private static Object printLock = new Object();
-
- /**
- * example: [10,20] will generate three buckets: (-\u221e,10), [10,20),[20,+\u221e)
- * unit: second
- */
- public TimeHistogram(long[] bucketsBoundary, String id) {
- this.bucketsBoundary = bucketsBoundary;
- this.counters = new AtomicLong[this.bucketsBoundary.length + 1];
- for (int i = 0; i < counters.length; i++) {
- this.counters[i] = new AtomicLong();
- }
- this.id = id;
- }
-
- /**
- * @param second in seconds
- */
- public void process(long second) {
- for (int i = 0; i < bucketsBoundary.length; ++i) {
- if (second < bucketsBoundary[i]) {
- counters[i].incrementAndGet();
- return;
- }
- }
-
- counters[bucketsBoundary.length].incrementAndGet();
- }
-
- /**
- * @param millis in milli seconds
- */
- public void processMillis(long millis) {
- process(millis / 1000);
- }
-
- public void printStatus() {
- long[] countersSnapshot = new long[counters.length];
- for (int i = 0; i < countersSnapshot.length; i++) {
- countersSnapshot[i] = counters[i].get();
- }
-
- long sum = 0;
- for (long counter : countersSnapshot) {
- sum += counter;
- }
-
- synchronized (printLock) {
- System.out.println("============== status of TimeHistogram " + id + " =================");
-
- for (int i = 0; i < countersSnapshot.length; ++i) {
- System.out.println(String.format("bucket: %d , count: %d ,percentage: %.4f", i, countersSnapshot[i], 1.0 * countersSnapshot[i] / (sum == 0 ? 1 : sum)));
- }
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
new file mode 100644
index 0000000..7a42598
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ */
+public class ByteBufferBackedInputStream extends InputStream {
+
+ private ByteBuffer buf;
+
+ public ByteBufferBackedInputStream(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+ return buf.get() & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] bytes, int off, int len) throws IOException {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+
+ len = Math.min(len, buf.remaining());
+ buf.get(bytes, off, len);
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
deleted file mode 100644
index bce9bb9..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.kafka.util;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-import kafka.cluster.BrokerEndPoint;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.source.kafka.TopicMeta;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.TopicMetadataResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-/**
- */
-public final class KafkaRequester {
-
- private static final Logger logger = LoggerFactory.getLogger(KafkaRequester.class);
-
- private static ConcurrentMap<String, SimpleConsumer> consumerCache = Maps.newConcurrentMap();
-
- static {
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- KafkaRequester.shutdown();
- }
- }));
- }
-
- private static SimpleConsumer getSimpleConsumer(Broker broker, int timeout, int bufferSize, String clientId) {
- String key = createKey(broker, timeout, bufferSize, clientId);
- if (consumerCache.containsKey(key)) {
- return consumerCache.get(key);
- } else {
- BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT);
- consumerCache.putIfAbsent(key, new SimpleConsumer(brokerEndPoint.host(), brokerEndPoint.port(), timeout, bufferSize, clientId));
- return consumerCache.get(key);
- }
- }
-
- private static String createKey(Broker broker, int timeout, int bufferSize, String clientId) {
- return broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).connectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId;
- }
-
- public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) {
- SimpleConsumer consumer;
- for (Broker broker : kafkaClusterConfig.getBrokers()) {
- consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(kafkaClusterConfig.getTopic());
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- TopicMetadataResponse resp;
- try {
- resp = consumer.send(req);
- } catch (Exception e) {
- logger.warn("cannot send TopicMetadataRequest successfully: " + e);
- continue;
- }
- final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
- if (topicMetadatas.size() != 1) {
- break;
- }
- final TopicMetadata topicMetadata = topicMetadatas.get(0);
- if (topicMetadata.errorCode() != 0) {
- break;
- }
- List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() {
- @Nullable
- @Override
- public Integer apply(PartitionMetadata partitionMetadata) {
- return partitionMetadata.partitionId();
- }
- });
- return new TopicMeta(kafkaClusterConfig.getTopic(), partitionIds);
- }
- logger.debug("cannot find topic:" + kafkaClusterConfig.getTopic());
- return null;
- }
-
- public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaClusterConfig kafkaClusterConfig) {
- logger.debug("Brokers: " + brokers.toString());
- SimpleConsumer consumer;
- for (Broker broker : brokers) {
- consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup");
- List<String> topics = Collections.singletonList(topic);
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- TopicMetadataResponse resp;
- try {
- resp = consumer.send(req);
- } catch (Exception e) {
- logger.warn("cannot send TopicMetadataRequest successfully: " + e);
- continue;
- }
- final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
- if (topicMetadatas.size() != 1) {
- logger.warn("invalid topicMetadata size:" + topicMetadatas.size());
- break;
- }
- final TopicMetadata topicMetadata = topicMetadatas.get(0);
- if (topicMetadata.errorCode() != 0) {
- logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode());
- break;
- }
- for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
- StringBuffer logText = new StringBuffer();
- logText.append("PartitionMetadata debug errorCode: " + partitionMetadata.errorCode());
- logText.append("PartitionMetadata debug partitionId: " + partitionMetadata.partitionId());
- logText.append("PartitionMetadata debug leader: " + partitionMetadata.leader());
- logText.append("PartitionMetadata debug ISR: " + partitionMetadata.isr());
- logText.append("PartitionMetadata debug replica: " + partitionMetadata.replicas());
- logger.info(logText.toString());
- if (partitionMetadata.partitionId() == partitionId) {
- return partitionMetadata;
- }
- }
- }
- logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId);
- return null;
- }
-
- public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
- final String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
- kafka.api.FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partitionId, offset, 1048576) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka, 1048576 is the default value on shell
- .build();
- return consumer.fetch(req);
- }
-
- public static long getLastOffset(String topic, int partitionId, long whichTime, Broker broker, KafkaClusterConfig kafkaClusterConfig) {
- String clientName = "client_" + topic + "_" + partitionId;
- SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName);
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
- OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
- OffsetResponse response = consumer.getOffsetsBefore(request);
-
- if (response.hasError()) {
- logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
- return 0;
- }
- long[] offsets = response.offsets(topic, partitionId);
- return offsets[0];
- }
-
- public static void shutdown() {
- for (SimpleConsumer simpleConsumer : consumerCache.values()) {
- simpleConsumer.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
deleted file mode 100644
index ee5bb20..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.util;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.source.kafka.StreamingParser;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.message.MessageAndOffset;
-
-/**
- */
-public final class KafkaUtils {
-
- private static final Logger logger = LoggerFactory.getLogger(KafkaUtils.class);
-
- private static final int MAX_RETRY_TIMES = 6;
-
- private KafkaUtils() {
- }
-
- public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
- final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
- if (partitionMetadata != null) {
- if (partitionMetadata.errorCode() != 0) {
- logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode());
- }
- return new Broker(partitionMetadata.leader(), SecurityProtocol.PLAINTEXT);
- } else {
- return null;
- }
- }
-
- private static void sleep(int retryTimes) {
- int seconds = (int) Math.pow(2, retryTimes);
- logger.info("retry times:" + retryTimes + " sleep:" + seconds + " seconds");
- try {
- Thread.sleep(seconds * 1000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static MessageAndOffset getKafkaMessage(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset) {
- final String topic = kafkaClusterConfig.getTopic();
- int retry = 0;
- while (retry < MAX_RETRY_TIMES) {//max sleep time 63 seconds
- final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
- if (leadBroker == null) {
- logger.warn("unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
- sleep(retry++);
- continue;
- }
- final FetchResponse response = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
- if (response.errorCode(topic, partitionId) != 0) {
- logger.warn("errorCode of FetchResponse is:" + response.errorCode(topic, partitionId));
- sleep(retry++);
- continue;
- }
- final Iterator<MessageAndOffset> iterator = response.messageSet(topic, partitionId).iterator();
- if (!iterator.hasNext()) {
- logger.warn("messageSet is empty");
- sleep(retry++);
- continue;
- }
- return iterator.next();
- }
- throw new IllegalStateException(String.format("try to get timestamp of topic: %s, partitionId: %d, offset: %d, failed to get StreamMessage from kafka", topic, partitionId, offset));
- }
-
- public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamingParser streamingParser) {
- Pair<Long, Long> firstAndLast = getFirstAndLastOffset(kafkaClusterConfig, partitionId);
- final String topic = kafkaClusterConfig.getTopic();
-
- logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, firstAndLast.getFirst(), firstAndLast.getSecond()));
- final long result = binarySearch(kafkaClusterConfig, partitionId, firstAndLast.getFirst(), firstAndLast.getSecond(), timestamp, streamingParser);
- logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, partitionId, result));
- return result;
- }
-
- public static Pair<Long, Long> getFirstAndLastOffset(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
- final String topic = kafkaClusterConfig.getTopic();
- final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
- final long earliestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
- final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig) - 1;
- return Pair.newPair(earliestOffset, latestOffset);
- }
-
- private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamingParser streamingParser) {
- Map<Long, Long> cache = Maps.newHashMap();
-
- while (startOffset < endOffset) {
- long midOffset = startOffset + ((endOffset - startOffset) >> 1);
- long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamingParser, cache);
- long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamingParser, cache);
- long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamingParser, cache);
- // hard to ensure these 2 conditions
- // Preconditions.checkArgument(startTimestamp <= midTimestamp);
- // Preconditions.checkArgument(midTimestamp <= endTimestamp);
- if (startTimestamp >= targetTimestamp) {
- return startOffset;
- }
- if (endTimestamp <= targetTimestamp) {
- return endOffset;
- }
- if (targetTimestamp == midTimestamp) {
- return midOffset;
- } else if (targetTimestamp < midTimestamp) {
- endOffset = midOffset - 1;
- continue;
- } else {
- startOffset = midOffset + 1;
- continue;
- }
- }
- return startOffset;
- }
-
- private static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamingParser streamingParser, Map<Long, Long> cache) {
- if (cache.containsKey(offset)) {
- return cache.get(offset);
- } else {
- long t = getDataTimestamp(kafkaClusterConfig, partitionId, offset, streamingParser);
- cache.put(offset, t);
- return t;
- }
- }
-
- public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamingParser streamingParser) {
- final String topic = kafkaClusterConfig.getTopic();
- final MessageAndOffset messageAndOffset = getKafkaMessage(kafkaClusterConfig, partitionId, offset);
- final ByteBuffer payload = messageAndOffset.message().payload();
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
- streamingMessage.setOffset(messageAndOffset.offset());
- logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, streamingMessage.getTimestamp()));
- return streamingMessage.getTimestamp();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/test/java/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
index fb33059..5a52b61 100644
--- a/source-kafka/src/test/java/TimedJsonStreamParserTest.java
+++ b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
@@ -20,7 +20,8 @@ import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
import com.fasterxml.jackson.databind.type.MapType;
@@ -32,7 +33,6 @@ import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 1d33071..23e7239 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -41,10 +41,6 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-engine-mr</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-engine-streaming</artifactId>
- </dependency>
<!-- Env & Test -->
<dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
deleted file mode 100644
index 9adaf24..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.storage.hbase.steps;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.inmemcubing.CompoundCuboidWriter;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
-import org.apache.kylin.engine.streaming.IStreamingOutput;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class HBaseStreamingOutput implements IStreamingOutput {
-
- private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class);
-
- @Override
- public ICuboidWriter getCuboidWriter(IBuildable buildable) {
- try {
- CubeSegment cubeSegment = (CubeSegment) buildable;
-
- final HTableInterface hTable;
- hTable = createHTable(cubeSegment);
- List<ICuboidWriter> cuboidWriters = Lists.newArrayList();
- cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable));
- cuboidWriters.add(new SequenceFileCuboidWriter(cubeSegment.getCubeDesc(), cubeSegment));
- return new CompoundCuboidWriter(cuboidWriters);
- } catch (IOException e) {
- throw new RuntimeException("failed to get ICuboidWriter", e);
- }
- }
-
- @Override
- public void output(IBuildable buildable, Map<Long, HyperLogLogPlusCounter> samplingResult) {
- try {
- CubeSegment cubeSegment = (CubeSegment) buildable;
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- final Configuration conf = HadoopUtil.getCurrentConfiguration();
- final Path outputPath = new Path("file://" + BatchConstants.CFG_STATISTICS_LOCAL_DIR + UUID.randomUUID().toString());
- CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
- FSDataInputStream inputStream = null;
- try {
- inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME));
- ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), inputStream, System.currentTimeMillis());
- } finally {
- IOUtils.closeQuietly(inputStream);
- FileSystem.getLocal(conf).delete(outputPath, true);
- }
- } catch (IOException e) {
- throw new RuntimeException("failed to write sampling result", e);
- }
- }
-
- private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException {
- final String hTableName = cubeSegment.getStorageLocationIdentifier();
- CubeHTableUtil.createHTable(cubeSegment, null);
- final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
- logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
- return hTable;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
index f4fb308..9cb135a 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
@@ -35,8 +35,8 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.job.dao.ExecutableDao;
import org.apache.kylin.job.dao.ExecutablePO;
import org.apache.kylin.job.exception.PersistentException;