You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:20 UTC
[08/50] [abbrv] incubator-kylin git commit: KYLIN-749 new test case
added, need manually insert data into kafka
KYLIN-749 new test case added, need manually insert data into kafka
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/09aba285
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/09aba285
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/09aba285
Branch: refs/heads/streaming-localdict
Commit: 09aba285da10fb1fe266d0ff1813c169bf570d80
Parents: 67a5937
Author: honma <ho...@ebay.com>
Authored: Tue May 5 16:41:47 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue May 5 16:41:47 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/DateFormat.java | 6 +--
.../org/apache/kylin/common/util/JsonUtil.java | 9 ++--
.../org/apache/kylin/common/util/SortUtil.java | 21 ++++++++
.../invertedindex/test_streaming_table.json | 4 +-
.../test_streaming_table_model_desc.json | 1 +
.../streaming/test_streaming_table.json | 20 ++++++++
.../kylin/job/streaming/KafkaDataLoader.java | 46 +++++++++++++++++
.../java/org/apache/kylin/job/DataGenTest.java | 52 ++++++++++++++++++--
.../job/dataGen/StreamingDataGenerator.java | 36 ++++++++------
.../apache/kylin/metadata/model/ColumnDesc.java | 2 +-
.../kylin/streaming/JsonStreamParser.java | 8 +--
11 files changed, 169 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/09aba285/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
index 7f12ac5..d184a24 100644
--- a/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
+++ b/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -15,7 +15,7 @@ public class DateFormat {
static final private Map<String, ThreadLocal<SimpleDateFormat>> threadLocalMap = new ConcurrentHashMap<String, ThreadLocal<SimpleDateFormat>>();
- static SimpleDateFormat getDateFormat(String datePattern) {
+ public static SimpleDateFormat getDateFormat(String datePattern) {
ThreadLocal<SimpleDateFormat> formatThreadLocal = threadLocalMap.get(datePattern);
if (formatThreadLocal == null) {
threadLocalMap.put(datePattern, formatThreadLocal = new ThreadLocal<SimpleDateFormat>());
@@ -37,10 +37,6 @@ public class DateFormat {
return getDateFormat(DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS).format(new Date(millis));
}
- public static String dateToString(Date date) {
- return dateToString(date, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
- }
-
public static String dateToString(Date date, String pattern) {
return getDateFormat(pattern).format(date);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/09aba285/common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/JsonUtil.java b/common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
index 2a1e314..5b3a22c 100644
--- a/common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
@@ -18,12 +18,6 @@
package org.apache.kylin.common.util;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Reader;
-
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -32,12 +26,15 @@ import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
+import java.io.*;
+
public class JsonUtil {
// reuse the object mapper to save memory footprint
private static final ObjectMapper mapper = new ObjectMapper();
private static final ObjectMapper indentMapper = new ObjectMapper();
+
static {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
indentMapper.configure(SerializationFeature.INDENT_OUTPUT, true);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/09aba285/common/src/main/java/org/apache/kylin/common/util/SortUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/SortUtil.java b/common/src/main/java/org/apache/kylin/common/util/SortUtil.java
new file mode 100644
index 0000000..ac9f216
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/SortUtil.java
@@ -0,0 +1,21 @@
+package org.apache.kylin.common.util;
+
+import com.google.common.base.Function;
+import com.google.common.collect.TreeMultimap;
+
+import java.util.Iterator;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/5/15.
+ */
+public class SortUtil {
+ public static <T extends Comparable, E extends Comparable> Iterator<T> extractAndSort(Iterator<T> input, Function<T, E> extractor) {
+ TreeMultimap<E, T> reorgnized = TreeMultimap.create();
+ while (input.hasNext()) {
+ T t = input.next();
+ E e = extractor.apply(t);
+ reorgnized.put(e, t);
+ }
+ return reorgnized.values().iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/09aba285/examples/test_case_data/localmeta/invertedindex/test_streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_streaming_table.json b/examples/test_case_data/localmeta/invertedindex/test_streaming_table.json
index dc341aa..cbcca12 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_streaming_table.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_streaming_table.json
@@ -8,11 +8,11 @@
"segments": [
{
"uuid": null,
- "name": "19700101000000_20140901000000",
+ "name": "19700101000000_20190901000000",
"status": "READY",
"dictionaries": {
},
- "storage_location_identifier": "",
+ "storage_location_identifier": "KYLIN_2STEAMTEST",
"date_range_start": 0,
"date_range_end": 0,
"size_kb": 0,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/09aba285/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
index 66245ae..d0abb91 100644
--- a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json
@@ -13,6 +13,7 @@
]
}
],
+ "lookups": [],
"metrics": [
"gmv",
"item_count"
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/09aba285/examples/test_case_data/localmeta/streaming/test_streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table.json b/examples/test_case_data/localmeta/streaming/test_streaming_table.json
new file mode 100644
index 0000000..537919a
--- /dev/null
+++ b/examples/test_case_data/localmeta/streaming/test_streaming_table.json
@@ -0,0 +1,20 @@
+{
+ "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322",
+ "name": "test_streaming_table",
+ "zookeeper": "sandbox:2181",
+ "topic": "test_streaming_table",
+ "timeout": 60000,
+ "maxReadCount": 1000,
+ "bufferSize": 65536,
+ "iiName": "test_streaming_table",
+ "parserName": "org.apache.kylin.streaming.JsonStreamParser",
+ "partition": 1,
+ "last_modified": 0,
+ "brokers": [
+ {
+ "id": 0,
+ "host": "sandbox",
+ "port": 6667
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/09aba285/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
new file mode 100644
index 0000000..5577dce
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -0,0 +1,46 @@
+package org.apache.kylin.job.streaming;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.streaming.KafkaConfig;
+import org.apache.kylin.streaming.StreamingManager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/5/15.
+ */
+public class KafkaDataLoader {
+ /**
+ *
+ * @param args args[0] data file path, args[1] streaming name
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException {
+ StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
+ KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(args[1]);
+
+ List<String> alldata = FileUtils.readLines(new File(args[0]));
+
+ Properties props = new Properties();
+ props.put("metadata.broker.list", "sandbox:6667");
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("request.required.acks", "1");
+
+ ProducerConfig config = new ProducerConfig(props);
+
+ Producer<String, String> producer = new Producer<String, String>(config);
+
+ for (int i = 0; i < alldata.size(); ++i) {
+ KeyedMessage<String, String> data = new KeyedMessage<String, String>(kafkaConfig.getTopic(), String.valueOf(i), alldata.get(i));
+ producer.send(data);
+ }
+ producer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/09aba285/job/src/test/java/org/apache/kylin/job/DataGenTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DataGenTest.java b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
index c05ddc7..4e3cf17 100644
--- a/job/src/test/java/org/apache/kylin/job/DataGenTest.java
+++ b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
@@ -18,15 +18,28 @@
package org.apache.kylin.job;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.base.Function;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.SortUtil;
import org.apache.kylin.job.dataGen.FactTableGenerator;
+import org.apache.kylin.job.dataGen.StreamingDataGenerator;
+import org.apache.kylin.metadata.MetadataManager;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.metadata.MetadataManager;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
/**
* Created by honma on 6/19/14.
@@ -56,6 +69,39 @@ public class DataGenTest extends LocalFileMetadataTestCase {
@Test
public void testStreaming() throws Exception {
+ int totalCount = 10000;
+ int counter = 0;
+
+ Iterator<String> iterator = StreamingDataGenerator.generate(DateFormat.stringToMillis("2015-01-03"), DateFormat.stringToMillis("2015-01-06"), totalCount);
+
+ iterator = SortUtil.extractAndSort(iterator, new Function<String, Comparable>() {
+ public Comparable apply(String input) {
+ return getTsStr(input);
+ }
+ });
+
+ long lastTs = 0;
+ while (iterator.hasNext()) {
+ counter++;
+ String row = iterator.next();
+ System.out.println(row);
+ long ts = Long.parseLong(getTsStr(row));
+ Assert.assertTrue(ts >= lastTs);
+ lastTs = ts;
+ }
+ Assert.assertEquals(totalCount, counter);
}
+ final JavaType javaType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+ final ObjectMapper objectMapper = new ObjectMapper();
+
+ private String getTsStr(String input) {
+ Map<String, String> json ;
+ try {
+ json = objectMapper.readValue(input.getBytes(), javaType);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return json.get("ts");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/09aba285/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
index d822a9a..bf915e6 100644
--- a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
@@ -1,17 +1,18 @@
package org.apache.kylin.job.dataGen;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -25,23 +26,23 @@ public class StreamingDataGenerator {
private static Random random = new Random();
private static String[] decimalFormat = new String[] { "%.4f", "%.5f", "%.6f" };
- public static Iterator<List<String>> generate(final long start, final long end, final int count) {
+ public static Iterator<String> generate(final long start, final long end, final int count) {
final KylinConfig config = KylinConfig.getInstanceFromEnv();
final IIInstance ii = IIManager.getInstance(config).getII("test_streaming_table");
final IIDesc iiDesc = ii.getDescriptor();
- final MetadataManager metadataManager = MetadataManager.getInstance(config);
- final ColumnDesc[] columnDescs = metadataManager.getTableDesc(iiDesc.getFactTableName()).getColumns();
+ final List<TblColRef> columns = iiDesc.listAllColumns();
- return new Iterator<List<String>>() {
- private Map<String, String> values = Maps.newHashMap();
+ return new Iterator<String>() {
+ private Map<String, String> values = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ private int index = 0;
@Override
public boolean hasNext() {
- return false;
+ return this.index < count;
}
@Override
- public List<String> next() {
+ public String next() {
values.clear();
long ts = this.createTs(start, end);
values.put("ts", Long.toString(ts));
@@ -53,16 +54,19 @@ public class StreamingDataGenerator {
values.put("gmv", String.format(decimalFormat[random.nextInt(3)], random.nextFloat() * 100));
values.put("item_count", Integer.toString(random.nextInt(5)));
- if (values.size() != columnDescs.length) {
+ if (values.size() != columns.size()) {
throw new RuntimeException("the structure of streaming table has changed, need to modify generator too");
}
- List<String> ret = Lists.newArrayList();
- for (ColumnDesc columnDesc : columnDescs) {
- String name = columnDesc.getName();
- ret.add(values.get(name));
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ try {
+ JsonUtil.writeValue(os, values);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
}
- return ret;
+ index++;
+ return new String(os.toByteArray());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/09aba285/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index 8d7ebfd..203d304 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -73,7 +73,7 @@ public class ColumnDesc {
}
public void setDatatype(String datatype) {
- logger.info("setting datatype to " + datatype);
+ //logger.info("setting datatype to " + datatype);
this.datatype = datatype;
type = DataType.getInstance(datatype);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/09aba285/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
index d762a14..d229647 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
@@ -34,6 +34,7 @@
package org.apache.kylin.streaming;
+import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import com.fasterxml.jackson.databind.type.SimpleType;
@@ -54,18 +55,19 @@ import java.util.Map;
public final class JsonStreamParser implements StreamParser {
private static final Logger logger = LoggerFactory.getLogger(JsonStreamParser.class);
+ private static final JavaType javaType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+ private static final ObjectMapper objectMapper = new ObjectMapper();
private final List<TblColRef> allColumns;
- public JsonStreamParser(List<TblColRef> allColumns){
+ public JsonStreamParser(List<TblColRef> allColumns) {
this.allColumns = allColumns;
}
@Override
public List<String> parse(StreamMessage streamMessage) {
try {
- Map<String, String> json = new ObjectMapper().readValue(
- streamMessage.getRawData(), MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class)));
+ Map<String, String> json = objectMapper.readValue(streamMessage.getRawData(), javaType);
ArrayList<String> result = Lists.newArrayList();
for (TblColRef column : allColumns) {
for (Map.Entry<String, String> entry : json.entrySet()) {