You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/12 10:55:56 UTC
[2/3] incubator-kylin git commit: KYLIN-826
KYLIN-826
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/db3aafcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/db3aafcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/db3aafcb
Branch: refs/heads/0.8
Commit: db3aafcb2233ccd55cc0f84606c7b5a92b4e8b69
Parents: c96a02c
Author: honma <ho...@ebay.com>
Authored: Fri Jun 12 15:39:34 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Jun 12 16:55:10 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/BasicTest.java | 9 +++
.../kylin/job/streaming/KafkaDataLoader.java | 44 ++++++++----
.../apache/kylin/job/BasicLocalMetaTest.java | 24 +++++++
.../kylin/job/BuildCubeWithStreamTest.java | 64 +++++++++--------
.../streaming/StreamingTableDataGenerator.java | 73 ++++++++++++++++++++
.../broadcaster/BroadcasterReceiveServlet.java | 10 +--
.../kylin/streaming/KafkaClusterConfig.java | 4 ++
.../kylin/streaming/StreamingManager.java | 12 +++-
.../kylin/streaming/OneOffStreamProducer.java | 1 +
9 files changed, 191 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 01a22c1..2f0590b 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -19,6 +19,7 @@
package org.apache.kylin.common.util;
import com.google.common.collect.Lists;
+import com.google.common.collect.TreeMultiset;
import org.apache.commons.configuration.ConfigurationException;
import org.junit.Assert;
import org.junit.Ignore;
@@ -62,6 +63,14 @@ public class BasicTest {
@Test
public void test0() throws Exception {
+ TreeMultiset<Long> xx = TreeMultiset.create();
+ xx.add(2L);
+ xx.add(1L);
+ xx.add(1L);
+ for(Long hi : xx)
+ {
+ System.out.println(hi);
+ }
System.out.println(Long.MAX_VALUE);
IdentityHashMap<String, Void> a = new IdentityHashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index eb47d15..da2c711 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -1,34 +1,44 @@
package org.apache.kylin.job.streaming;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.streaming.BrokerConfig;
+import org.apache.kylin.streaming.KafkaClusterConfig;
import org.apache.kylin.streaming.StreamingConfig;
import org.apache.kylin.streaming.StreamingManager;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
/**
+ * Load prepared data into kafka(for test use)
*/
public class KafkaDataLoader {
- /**
- *
- * @param args args[0] data file path, args[1] streaming name
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
- StreamingConfig streamingConfig = streamingManager.getStreamingConfig(args[1]);
- List<String> alldata = FileUtils.readLines(new File(args[0]));
+ public static void loadIntoKafka(String streamName, List<String> messages) {
+
+ StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
+ StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streamName);
+ KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
+ String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
+ @Nullable
+ @Override
+ public String apply(BrokerConfig brokerConfig) {
+ return brokerConfig.getHost() + ":" + brokerConfig.getPort();
+ }
+ }), ",");
Properties props = new Properties();
- props.put("metadata.broker.list", "sandbox:6667");
+ props.put("metadata.broker.list", brokerList);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
@@ -36,10 +46,20 @@ public class KafkaDataLoader {
Producer<String, String> producer = new Producer<String, String>(config);
- for (int i = 0; i < alldata.size(); ++i) {
- KeyedMessage<String, String> data = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), alldata.get(i));
+ for (int i = 0; i < messages.size(); ++i) {
+ KeyedMessage<String, String> data = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
producer.send(data);
}
producer.close();
}
+
+ /**
+ *
+ * @param args args[0] data file path, args[1] streaming name
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException {
+ List<String> alldata = FileUtils.readLines(new File(args[0]));
+ loadIntoKafka(args[1], alldata);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/job/src/test/java/org/apache/kylin/job/BasicLocalMetaTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BasicLocalMetaTest.java b/job/src/test/java/org/apache/kylin/job/BasicLocalMetaTest.java
new file mode 100644
index 0000000..d034a73
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/BasicLocalMetaTest.java
@@ -0,0 +1,24 @@
+package org.apache.kylin.job;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ */
+public class BasicLocalMetaTest extends LocalFileMetadataTestCase {
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void basicTest() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index 9dca76a..ec94c56 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -34,18 +34,11 @@
package org.apache.kylin.job;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
@@ -67,25 +60,30 @@ import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.streaming.StreamingConfig;
+import org.apache.kylin.streaming.StreamingManager;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SetMultimap;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
/**
- *
- * This class is going to be deleted
+ * for streaming cubing case "test_streaming_table"
*/
-@Ignore("For dev testing")
public class BuildCubeWithStreamTest {
private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamTest.class);
@@ -98,6 +96,7 @@ public class BuildCubeWithStreamTest {
logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
+
}
@Before
@@ -108,6 +107,14 @@ public class BuildCubeWithStreamTest {
kylinConfig = KylinConfig.getInstanceFromEnv();
cubeManager = CubeManager.getInstance(kylinConfig);
+ //Use a random toplic for kafka data stream
+ StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig("test_streaming_table_table");
+ streamingConfig.setTopic(UUID.randomUUID().toString());
+ StreamingManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
+ }
+
+ private void loadKafkaData() {
+
}
@After
@@ -125,7 +132,7 @@ public class BuildCubeWithStreamTest {
CubeSegment cubeSegment = cube.getSegment("19700101000000_20150401000000", SegmentStatusEnum.NEW);
Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
-//
+ //
for (DimensionDesc dim : desc.getDimensions()) {
// dictionary
for (TblColRef col : dim.getColumnRefs()) {
@@ -140,15 +147,14 @@ public class BuildCubeWithStreamTest {
}
}
-// final String tableName = createIntermediateTable(desc, kylinConfig, null);
+ // final String tableName = createIntermediateTable(desc, kylinConfig, null);
String tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a24dec89_efbd_425f_9a5f_8b78dd1412af"; // has 3089 records;
-// tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a5e1eb5d_da6b_475d_9807_be0b61f03215"; // only 20 rows;
-// tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150302000000_0a183367_f245_43d1_8850_1c138c8514c3";
-// tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150301000000_ce061464_7962_4642_bd7d_7c3d8fbe9389";
+ // tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a5e1eb5d_da6b_475d_9807_be0b61f03215"; // only 20 rows;
+ // tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150302000000_0a183367_f245_43d1_8850_1c138c8514c3";
+ // tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150301000000_ce061464_7962_4642_bd7d_7c3d8fbe9389";
tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150401000000_fb7ae579_d987_4900_a3b7_c60c731cd269"; // 2 million records
logger.info("intermediate table name:" + tableName);
-
ArrayBlockingQueue queue = new ArrayBlockingQueue<List<String>>(10000);
InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new ConsoleGTRecordWriter());
@@ -166,7 +172,7 @@ public class BuildCubeWithStreamTest {
row = reader.getRowAsList();
queue.put(row);
counter++;
- if(counter == 200000)
+ if (counter == 200000)
break;
}
queue.put(new ArrayList<String>(0));
@@ -182,7 +188,6 @@ public class BuildCubeWithStreamTest {
logger.info("stream build finished");
}
-
private void buildDictionary(List<List<String>> table, CubeDesc desc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
SetMultimap<TblColRef, String> valueMap = HashMultimap.create();
@@ -216,7 +221,6 @@ public class BuildCubeWithStreamTest {
}
-
class ConsoleGTRecordWriter implements ICuboidWriter {
boolean verbose = false;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
new file mode 100644
index 0000000..394b56f
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
@@ -0,0 +1,73 @@
+package org.apache.kylin.job.streaming;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.TreeMultiset;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * this is for generating fact table data for test_streaming_table
+ */
+public class StreamingTableDataGenerator {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingTableDataGenerator.class);
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ public static List<String> generate(int recordCount, long startTime, long endTime) {
+ Preconditions.checkArgument(startTime < endTime);
+ Preconditions.checkArgument(recordCount > 0);
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc("streaming_table");
+
+ SortedMultiset<Long> times = TreeMultiset.create();
+ Random r = new Random();
+ for (int i = 0; i < recordCount; i++) {
+ long t = startTime + (long) ((endTime - startTime) * r.nextDouble());
+ times.add(t);
+ }
+
+ List<String> ret = Lists.newArrayList();
+ HashMap<String, String> kvs = Maps.newHashMap();
+ for (long time : times) {
+ kvs.clear();
+ kvs.put("timestamp", String.valueOf(time));
+ for (ColumnDesc columnDesc : tableDesc.getColumns()) {
+ DataType dataType = columnDesc.getType();
+ if (dataType.isDateTimeFamily()) {
+ continue;
+ } else if (dataType.isStringFamily()) {
+ char c = (char) ('A' + (int) (26 * r.nextDouble()));
+ kvs.put(columnDesc.getName(), String.valueOf(c));
+ } else if (dataType.isIntegerFamily()) {
+ int v = r.nextInt(10000);
+ kvs.put(columnDesc.getName(), String.valueOf(v));
+ } else if (dataType.isNumberFamily()) {
+ String v = String.format("%.4f", r.nextDouble() * 100);
+ kvs.put(columnDesc.getName(), v);
+ }
+ }
+ try {
+ ret.add(mapper.writeValueAsString(kvs));
+ } catch (JsonProcessingException e) {
+ logger.error("error!",e);
+ }
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/server/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java b/server/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
index ed63183..a68f67a 100644
--- a/server/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
+++ b/server/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
@@ -18,14 +18,13 @@
package org.apache.kylin.rest.broadcaster;
-import java.io.IOException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
*/
@@ -33,7 +32,7 @@ public class BroadcasterReceiveServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
- public static interface BroadcasterHandler {
+ public interface BroadcasterHandler {
void handle(String type, String name, String event);
}
@@ -45,6 +44,7 @@ public class BroadcasterReceiveServlet extends HttpServlet {
}
private static final Pattern PATTERN = Pattern.compile("/(.+)/(.+)/(.+)");
+
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
handle(req, resp);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
index ff34dd9..6a4b389 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaClusterConfig.java
@@ -41,6 +41,10 @@ public class KafkaClusterConfig extends RootPersistentEntity {
return streamingConfig.getMaxReadCount();
}
+ public List<BrokerConfig> getBrokerConfigs() {
+ return brokerConfigs;
+ }
+
public List<Broker> getBrokers() {
return Lists.transform(brokerConfigs, new Function<BrokerConfig, Broker>() {
@Nullable
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
index 3552edc..b4ecb0e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
@@ -72,7 +72,6 @@ public class StreamingManager {
return ResourceStore.getStore(this.config);
}
-
public static StreamingManager getInstance(KylinConfig config) {
StreamingManager r = CACHE.get(config);
if (r != null) {
@@ -109,7 +108,6 @@ public class StreamingManager {
return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
}
-
public boolean createOrUpdateKafkaConfig(String name, StreamingConfig config) {
try {
getStore().putResource(formatStreamingConfigPath(name), config, StreamingConfig.SERIALIZER);
@@ -129,6 +127,15 @@ public class StreamingManager {
}
}
+ public void saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+ if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
+ throw new IllegalArgumentException();
+ }
+
+ String path = formatStreamingConfigPath(streamingConfig.getName());
+ getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
+ }
+
public long getOffset(String streaming, int shard) {
final String resPath = formatStreamingOutputPath(streaming, shard);
try {
@@ -189,5 +196,4 @@ public class StreamingManager {
private final ObjectMapper mapper = new ObjectMapper();
private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db3aafcb/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
index 93da99f..3cafe3f 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
@@ -53,6 +53,7 @@ import java.io.IOException;
import java.util.Properties;
/**
+ * for tests
*/
public class OneOffStreamProducer {