You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:26 UTC

[14/50] [abbrv] incubator-kylin git commit: KYLIN-749 new test case added, need manually insert data into kafka

KYLIN-749 new test case added, need manually insert data into kafka


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c8bc428a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c8bc428a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c8bc428a

Branch: refs/heads/streaming-localdict
Commit: c8bc428a93d8506d79ae008ccc5f7d4dbef5cc72
Parents: 23c61d2
Author: honma <ho...@ebay.com>
Authored: Tue May 5 16:41:47 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed May 6 10:16:48 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/DateFormat.java    |  6 +--
 .../org/apache/kylin/common/util/JsonUtil.java  |  9 ++--
 .../org/apache/kylin/common/util/SortUtil.java  | 21 ++++++++
 .../invertedindex/test_streaming_table.json     |  4 +-
 .../test_streaming_table_model_desc.json        |  1 +
 .../streaming/test_streaming_table.json         | 20 ++++++++
 .../kylin/job/streaming/KafkaDataLoader.java    | 46 +++++++++++++++++
 .../java/org/apache/kylin/job/DataGenTest.java  | 52 ++++++++++++++++++--
 .../job/dataGen/StreamingDataGenerator.java     | 36 ++++++++------
 .../apache/kylin/metadata/model/ColumnDesc.java |  2 +-
 .../kylin/streaming/JsonStreamParser.java       |  8 +--
 11 files changed, 169 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c8bc428a/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
index 7f12ac5..d184a24 100644
--- a/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
+++ b/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -15,7 +15,7 @@ public class DateFormat {
 
     static final private Map<String, ThreadLocal<SimpleDateFormat>> threadLocalMap = new ConcurrentHashMap<String, ThreadLocal<SimpleDateFormat>>();
 
-    static SimpleDateFormat getDateFormat(String datePattern) {
+    public static SimpleDateFormat getDateFormat(String datePattern) {
         ThreadLocal<SimpleDateFormat> formatThreadLocal = threadLocalMap.get(datePattern);
         if (formatThreadLocal == null) {
             threadLocalMap.put(datePattern, formatThreadLocal = new ThreadLocal<SimpleDateFormat>());
@@ -37,10 +37,6 @@ public class DateFormat {
         return getDateFormat(DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS).format(new Date(millis));
     }
 
-    public static String dateToString(Date date) {
-        return dateToString(date, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
-    }
-
     public static String dateToString(Date date, String pattern) {
         return getDateFormat(pattern).format(date);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c8bc428a/common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/JsonUtil.java b/common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
index 2a1e314..5b3a22c 100644
--- a/common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
@@ -18,12 +18,6 @@
 
 package org.apache.kylin.common.util;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Reader;
-
 import com.fasterxml.jackson.core.JsonGenerationException;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -32,12 +26,15 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 
+import java.io.*;
+
 public class JsonUtil {
 
     // reuse the object mapper to save memory footprint
     private static final ObjectMapper mapper = new ObjectMapper();
     private static final ObjectMapper indentMapper = new ObjectMapper();
 
+
     static {
         mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
         indentMapper.configure(SerializationFeature.INDENT_OUTPUT, true);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c8bc428a/common/src/main/java/org/apache/kylin/common/util/SortUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/SortUtil.java b/common/src/main/java/org/apache/kylin/common/util/SortUtil.java
new file mode 100644
index 0000000..ac9f216
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/SortUtil.java
@@ -0,0 +1,21 @@
+package org.apache.kylin.common.util;
+
+import com.google.common.base.Function;
+import com.google.common.collect.TreeMultimap;
+
+import java.util.Iterator;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/5/15.
+ */
+public class SortUtil {
+    public static <T extends Comparable, E extends Comparable> Iterator<T> extractAndSort(Iterator<T> input, Function<T, E> extractor) {
+        TreeMultimap<E, T> reorgnized = TreeMultimap.create();
+        while (input.hasNext()) {
+            T t = input.next();
+            E e = extractor.apply(t);
+            reorgnized.put(e, t);
+        }
+        return reorgnized.values().iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c8bc428a/examples/test_case_data/localmeta/invertedindex/test_streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_streaming_table.json b/examples/test_case_data/localmeta/invertedindex/test_streaming_table.json
index dc341aa..cbcca12 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_streaming_table.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_streaming_table.json
@@ -8,11 +8,11 @@
   "segments": [
     {
       "uuid": null,
-      "name": "19700101000000_20140901000000",
+      "name": "19700101000000_20190901000000",
       "status": "READY",
       "dictionaries": {
       },
-      "storage_location_identifier": "",
+      "storage_location_identifier": "KYLIN_2STEAMTEST",
       "date_range_start": 0,
       "date_range_end": 0,
       "size_kb": 0,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c8bc428a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
index 66245ae..d0abb91 100644
--- a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
@@ -13,6 +13,7 @@
       ]
     }
   ],
+  "lookups": [],
   "metrics": [
     "gmv",
     "item_count"

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c8bc428a/examples/test_case_data/localmeta/streaming/test_streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table.json b/examples/test_case_data/localmeta/streaming/test_streaming_table.json
new file mode 100644
index 0000000..537919a
--- /dev/null
+++ b/examples/test_case_data/localmeta/streaming/test_streaming_table.json
@@ -0,0 +1,20 @@
+{
+  "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322",
+  "name": "test_streaming_table",
+  "zookeeper": "sandbox:2181",
+  "topic": "test_streaming_table",
+  "timeout": 60000,
+  "maxReadCount": 1000,
+  "bufferSize": 65536,
+  "iiName": "test_streaming_table",
+  "parserName": "org.apache.kylin.streaming.JsonStreamParser",
+  "partition": 1,
+  "last_modified": 0,
+  "brokers": [
+    {
+      "id": 0,
+      "host": "sandbox",
+      "port": 6667
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c8bc428a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
new file mode 100644
index 0000000..5577dce
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -0,0 +1,46 @@
+package org.apache.kylin.job.streaming;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.streaming.KafkaConfig;
+import org.apache.kylin.streaming.StreamingManager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/5/15.
+ */
+public class KafkaDataLoader {
+    /**
+     *
+     * @param args args[0] data file path, args[1] streaming name
+     * @throws IOException
+     */
+    public static void main(String[] args) throws IOException {
+        StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
+        KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(args[1]);
+
+        List<String> alldata = FileUtils.readLines(new File(args[0]));
+
+        Properties props = new Properties();
+        props.put("metadata.broker.list", "sandbox:6667");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("request.required.acks", "1");
+
+        ProducerConfig config = new ProducerConfig(props);
+
+        Producer<String, String> producer = new Producer<String, String>(config);
+
+        for (int i = 0; i < alldata.size(); ++i) {
+            KeyedMessage<String, String> data = new KeyedMessage<String, String>(kafkaConfig.getTopic(), String.valueOf(i), alldata.get(i));
+            producer.send(data);
+        }
+        producer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c8bc428a/job/src/test/java/org/apache/kylin/job/DataGenTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DataGenTest.java b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
index c05ddc7..4e3cf17 100644
--- a/job/src/test/java/org/apache/kylin/job/DataGenTest.java
+++ b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
@@ -18,15 +18,28 @@
 
 package org.apache.kylin.job;
 
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.base.Function;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.SortUtil;
 import org.apache.kylin.job.dataGen.FactTableGenerator;
+import org.apache.kylin.job.dataGen.StreamingDataGenerator;
+import org.apache.kylin.metadata.MetadataManager;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.metadata.MetadataManager;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Created by honma on 6/19/14.
@@ -56,6 +69,39 @@ public class DataGenTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testStreaming() throws Exception {
+        int totalCount = 10000;
+        int counter = 0;
+
+        Iterator<String> iterator = StreamingDataGenerator.generate(DateFormat.stringToMillis("2015-01-03"), DateFormat.stringToMillis("2015-01-06"), totalCount);
+
+        iterator = SortUtil.extractAndSort(iterator, new Function<String, Comparable>() {
+            public Comparable apply(String input) {
+                return getTsStr(input);
+            }
+        });
+
+        long lastTs = 0;
+        while (iterator.hasNext()) {
+            counter++;
+            String row = iterator.next();
+            System.out.println(row);
+            long ts = Long.parseLong(getTsStr(row));
+            Assert.assertTrue(ts >= lastTs);
+            lastTs = ts;
+        }
+        Assert.assertEquals(totalCount, counter);
     }
 
+    final JavaType javaType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+    final ObjectMapper objectMapper = new ObjectMapper();
+
+    private String getTsStr(String input) {
+        Map<String, String> json ;
+        try {
+            json = objectMapper.readValue(input.getBytes(), javaType);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return json.get("ts");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c8bc428a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
index d822a9a..bf915e6 100644
--- a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
@@ -1,17 +1,18 @@
 package org.apache.kylin.job.dataGen;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -25,23 +26,23 @@ public class StreamingDataGenerator {
     private static Random random = new Random();
     private static String[] decimalFormat = new String[] { "%.4f", "%.5f", "%.6f" };
 
-    public static Iterator<List<String>> generate(final long start, final long end, final int count) {
+    public static Iterator<String> generate(final long start, final long end, final int count) {
         final KylinConfig config = KylinConfig.getInstanceFromEnv();
         final IIInstance ii = IIManager.getInstance(config).getII("test_streaming_table");
         final IIDesc iiDesc = ii.getDescriptor();
-        final MetadataManager metadataManager = MetadataManager.getInstance(config);
-        final ColumnDesc[] columnDescs = metadataManager.getTableDesc(iiDesc.getFactTableName()).getColumns();
+        final List<TblColRef> columns = iiDesc.listAllColumns();
 
-        return new Iterator<List<String>>() {
-            private Map<String, String> values = Maps.newHashMap();
+        return new Iterator<String>() {
+            private Map<String, String> values = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+            private int index = 0;
 
             @Override
             public boolean hasNext() {
-                return false;
+                return this.index < count;
             }
 
             @Override
-            public List<String> next() {
+            public String next() {
                 values.clear();
                 long ts = this.createTs(start, end);
                 values.put("ts", Long.toString(ts));
@@ -53,16 +54,19 @@ public class StreamingDataGenerator {
                 values.put("gmv", String.format(decimalFormat[random.nextInt(3)], random.nextFloat() * 100));
                 values.put("item_count", Integer.toString(random.nextInt(5)));
 
-                if (values.size() != columnDescs.length) {
+                if (values.size() != columns.size()) {
                     throw new RuntimeException("the structure of streaming table has changed, need to modify generator too");
                 }
 
-                List<String> ret = Lists.newArrayList();
-                for (ColumnDesc columnDesc : columnDescs) {
-                    String name = columnDesc.getName();
-                    ret.add(values.get(name));
+                ByteArrayOutputStream os = new ByteArrayOutputStream();
+                try {
+                    JsonUtil.writeValue(os, values);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    throw new RuntimeException(e);
                 }
-                return ret;
+                index++;
+                return new String(os.toByteArray());
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c8bc428a/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index 8d7ebfd..203d304 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -73,7 +73,7 @@ public class ColumnDesc {
     }
 
     public void setDatatype(String datatype) {
-        logger.info("setting datatype to " + datatype);
+        //logger.info("setting datatype to " + datatype);
         this.datatype = datatype;
         type = DataType.getInstance(datatype);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c8bc428a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
index d762a14..d229647 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
@@ -34,6 +34,7 @@
 
 package org.apache.kylin.streaming;
 
+import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.type.MapType;
 import com.fasterxml.jackson.databind.type.SimpleType;
@@ -54,18 +55,19 @@ import java.util.Map;
 public final class JsonStreamParser implements StreamParser {
 
     private static final Logger logger = LoggerFactory.getLogger(JsonStreamParser.class);
+    private static final JavaType javaType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+    private static final ObjectMapper objectMapper = new ObjectMapper();
 
     private final List<TblColRef> allColumns;
 
-    public JsonStreamParser(List<TblColRef> allColumns){
+    public JsonStreamParser(List<TblColRef> allColumns) {
         this.allColumns = allColumns;
     }
 
     @Override
     public List<String> parse(StreamMessage streamMessage) {
         try {
-            Map<String, String> json = new ObjectMapper().readValue(
-                    streamMessage.getRawData(), MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class)));
+            Map<String, String> json = objectMapper.readValue(streamMessage.getRawData(), javaType);
             ArrayList<String> result = Lists.newArrayList();
             for (TblColRef column : allColumns) {
                 for (Map.Entry<String, String> entry : json.entrySet()) {