You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/05 11:53:50 UTC

incubator-kylin git commit: add UT & rm useless interface

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 8ef41be4e -> ff8bfd6cf


add UT & rm useless interface


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ff8bfd6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ff8bfd6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ff8bfd6c

Branch: refs/heads/0.8.0
Commit: ff8bfd6cf7d404a33f7c37a2838d34392163853a
Parents: 8ef41be
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jun 5 15:01:43 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 5 17:53:53 2015 +0800

----------------------------------------------------------------------
 .../streaming/PeriodicalStreamBuilderTest.java  | 133 +++++++++++++++++++
 .../apache/kylin/streaming/BatchCondition.java  |   1 -
 .../kylin/streaming/LimitedSizeCondition.java   |   4 -
 .../kylin/streaming/TimePeriodCondition.java    |   4 -
 4 files changed, 133 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ff8bfd6c/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
new file mode 100644
index 0000000..0eec947
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
@@ -0,0 +1,133 @@
+package org.apache.kylin.job.streaming;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.streaming.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ */
+public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(PeriodicalStreamBuilderTest.class);
+
+    @Before
+    public void setup() {
+        this.createTestMetadata();
+
+    }
+
+    @After
+    public void clear() {
+        this.cleanupTestMetadata();
+    }
+
+    private List<StreamMessage> prepareTestData(long start, long end, int count) {
+        double step = (double)(end - start) / (count - 1);
+        long ts = start;
+        int offset = 0;
+        ArrayList<StreamMessage> result = Lists.newArrayList();
+        for (int i = 0; i < count - 1; ++i) {
+            result.add(new StreamMessage(offset++, String.valueOf(ts).getBytes()));
+            ts += step;
+        }
+        result.add(new StreamMessage(offset++, String.valueOf(end).getBytes()));
+        assertEquals(count, result.size());
+        assertEquals(start+"", new String(result.get(0).getRawData()));
+        assertEquals(end+"", new String(result.get(count - 1).getRawData()));
+        return result;
+    }
+
+    @Test
+    public void test() throws ExecutionException, InterruptedException {
+
+        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
+        queues.add(new LinkedBlockingQueue<StreamMessage>());
+        queues.add(new LinkedBlockingQueue<StreamMessage>());
+
+        final long interval = 3000L;
+        final long nextPeriodStart = TimeUtil.getNextPeriodStart(System.currentTimeMillis(), interval);
+
+        final List<Integer> partitionIds = Lists.newArrayList();
+        for (int i = 0; i < queues.size(); i++) {
+            partitionIds.add(i);
+        }
+
+        final MicroStreamBatchConsumer consumer = new MicroStreamBatchConsumer() {
+            @Override
+            public void consume(MicroStreamBatch microStreamBatch) throws Exception {
+                logger.info("consuming batch:" + microStreamBatch.getPartitionId() + " count:" + microStreamBatch.size() + " timestamp:" + microStreamBatch.getTimestamp() + " offset:" + microStreamBatch.getOffset());
+            }
+
+            @Override
+            public void stop() {
+                logger.info("consumer stopped");
+            }
+        };
+        final StreamBuilder streamBuilder = StreamBuilder.newPeriodicalStreamBuilder("test", queues, consumer, nextPeriodStart, interval);
+
+        streamBuilder.setStreamParser(new StreamParser() {
+            @Override
+            public ParsedStreamMessage parse(StreamMessage streamMessage) {
+                return new ParsedStreamMessage(Collections.<String>emptyList(), streamMessage.getOffset(), Long.parseLong(new String(streamMessage.getRawData())), true);
+            }
+        });
+
+        Future<?> future = Executors.newSingleThreadExecutor().submit(streamBuilder);
+        long timeout = nextPeriodStart + interval;
+        int messageCount = 0;
+        int inPeriodMessageCount = 0;
+        int expectedOffset = 0;
+        logger.info("prepare to add StreamMessage");
+        while (true) {
+            long ts = System.currentTimeMillis();
+            if (ts > timeout + interval) {
+                break;
+            }
+            if (ts >= nextPeriodStart && ts < timeout) {
+                inPeriodMessageCount++;
+            }
+            for (BlockingQueue<StreamMessage> queue : queues) {
+                queue.put(new StreamMessage(messageCount, String.valueOf(ts).getBytes()));
+            }
+            if (expectedOffset == 0 && ts > timeout) {
+                expectedOffset = messageCount - 1;
+            }
+            messageCount++;
+            Thread.sleep(10);
+        }
+        logger.info("totally put " + messageCount + " StreamMessages");
+        logger.info("totally in period " + inPeriodMessageCount + " StreamMessages");
+
+        for (BlockingQueue<StreamMessage> queue : queues) {
+            queue.put(StreamMessage.EOF);
+        }
+
+        future.get();
+
+        for (BlockingQueue<StreamMessage> queue : queues) {
+            queue.take();
+        }
+
+        final Map<Integer, Long> offsets = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getOffset("test", partitionIds);
+        logger.info("offset:" + offsets);
+        for (Long offset : offsets.values()) {
+            assertEquals(expectedOffset, offset.longValue());
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ff8bfd6c/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
index 0fa11c1..4979039 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
@@ -12,5 +12,4 @@ public interface BatchCondition {
 
     Result apply(ParsedStreamMessage message);
 
-    BatchCondition copy();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ff8bfd6c/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
index 3c1e367..c99c75b 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
@@ -22,8 +22,4 @@ public class LimitedSizeCondition implements BatchCondition {
         }
     }
 
-    @Override
-    public BatchCondition copy() {
-        return new LimitedSizeCondition(this.limit);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ff8bfd6c/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
index fdd35fc..7752437 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
@@ -23,8 +23,4 @@ public class TimePeriodCondition implements BatchCondition {
         }
     }
 
-    @Override
-    public BatchCondition copy() {
-        return this;
-    }
 }