You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/16 13:14:41 UTC
[1/2] incubator-kylin git commit: KYLIN-826 streaming table case
build and verify result in KylinQueryTest
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 22cdfa2a8 -> 49e722ec2
KYLIN-826 streaming table case build and verify result in KylinQueryTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8c13aed0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8c13aed0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8c13aed0
Branch: refs/heads/0.8
Commit: 8c13aed0a92e21a399d08ed96c74e03e8dac5abe
Parents: c48962d
Author: honma <ho...@ebay.com>
Authored: Tue Jun 16 19:09:58 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 16 19:09:58 2015 +0800
----------------------------------------------------------------------
.../cube/test_streaming_table_cube.json | 2 +-
.../test_streaming_table_ii_desc.json | 3 +-
.../streaming/test_streaming_table_cube.json | 6 +-
.../localmeta/table/DEFAULT.SREAMING_TABLE.json | 2 +-
.../kylin/job/streaming/KafkaDataLoader.java | 26 +----
.../kylin/job/streaming/StreamingBootstrap.java | 1 -
.../kylin/job/tools/DeployCoprocessorCLI.java | 37 +++---
.../kylin/job/BuildCubeWithEngineTest.java | 4 +-
.../kylin/job/BuildCubeWithStreamTest.java | 17 +--
.../java/org/apache/kylin/job/DataGenTest.java | 56 +--------
.../kylin/job/DeployLocalMetaToRemoteTest.java | 67 +++++++++++
.../java/org/apache/kylin/job/DeployUtil.java | 57 ++++++----
.../kylin/job/dataGen/FactTableGenerator.java | 2 +-
.../job/dataGen/StreamingDataGenerator.java | 5 +-
.../streaming/StreamingTableDataGenerator.java | 16 +--
.../org/apache/kylin/query/test/H2Database.java | 36 ++----
.../kylin/query/test/ITKylinQueryTest.java | 9 +-
.../resources/query/sql_streaming/query01.sql | 1 +
.../resources/query/sql_streaming/query02.sql | 1 +
.../resources/query/sql_streaming/query03.sql | 1 +
.../resources/query/sql_streaming/query04.sql | 1 +
.../resources/query/sql_streaming/query05.sql | 1 +
.../resources/query/sql_streaming/query06.sql | 1 +
.../resources/query/sql_streaming/query07.sql | 1 +
.../resources/query/sql_streaming/query08.sql | 1 +
.../resources/query/sql_streaming/query09.sql | 1 +
.../resources/query/sql_streaming/query10.sql | 1 +
.../kylin/storage/hbase/HBaseKeyRange.java | 19 ++--
.../apache/kylin/streaming/StreamingUtil.java | 7 +-
.../kylin/streaming/TimedJsonStreamParser.java | 113 +++++++++++++++++++
30 files changed, 303 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/examples/test_case_data/localmeta/cube/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube/test_streaming_table_cube.json b/examples/test_case_data/localmeta/cube/test_streaming_table_cube.json
index 75fc09d..d0cf0d5 100644
--- a/examples/test_case_data/localmeta/cube/test_streaming_table_cube.json
+++ b/examples/test_case_data/localmeta/cube/test_streaming_table_cube.json
@@ -7,7 +7,7 @@
"status": "READY",
"segments": [],
"last_modified": 1433474940111,
- "descriptor": "test_streaming_table_cube",
+ "descriptor": "test_streaming_table_cube_desc",
"create_time_utc": 1433343833458,
"size_kb": 0,
"input_records_count": 0,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/examples/test_case_data/localmeta/invertedindex_desc/test_streaming_table_ii_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex_desc/test_streaming_table_ii_desc.json b/examples/test_case_data/localmeta/invertedindex_desc/test_streaming_table_ii_desc.json
index 7948d3a..fe4d885 100644
--- a/examples/test_case_data/localmeta/invertedindex_desc/test_streaming_table_ii_desc.json
+++ b/examples/test_case_data/localmeta/invertedindex_desc/test_streaming_table_ii_desc.json
@@ -3,12 +3,11 @@
"last_modified": 0,
"name": "test_streaming_table_ii_desc",
"model_name": "test_streaming_table_model_desc",
- "timestamp_dimension": "ts",
+ "timestamp_dimension": "minute_start",
"value_dimensions": [
{
"table": "default.streaming_table",
"columns": [
- "ts",
"minute_start",
"hour_start",
"day_start",
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
index c11bc4f..98b4218 100644
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
+++ b/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
@@ -1,12 +1,12 @@
{
"uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
- "name": "test_streaming_table_ii",
+ "name": "test_streaming_table_cube",
"topic": "test_streaming_table_topic_xyz",
"timeout": 60000,
"maxReadCount": 1000,
"bufferSize": 65536,
- "iiName": "test_streaming_table_ii",
- "parserName": "org.apache.kylin.streaming.JsonStreamParser",
+ "cubeName": "test_streaming_table_cube",
+ "parserName": "org.apache.kylin.streaming.TimedJsonStreamParser",
"partition": 1,
"last_modified": 0,
"clusters": [
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/examples/test_case_data/localmeta/table/DEFAULT.SREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/table/DEFAULT.SREAMING_TABLE.json b/examples/test_case_data/localmeta/table/DEFAULT.SREAMING_TABLE.json
index c4c2884..3033d91 100644
--- a/examples/test_case_data/localmeta/table/DEFAULT.SREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/table/DEFAULT.SREAMING_TABLE.json
@@ -35,7 +35,7 @@
{
"id": "7",
"name": "item_count",
- "datatype": "bigint"
+ "datatype": "int"
}
],
"database": "DEFAULT",
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index da2c711..df5d39c 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -2,20 +2,16 @@ package org.apache.kylin.job.streaming;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.streaming.BrokerConfig;
import org.apache.kylin.streaming.KafkaClusterConfig;
import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.StreamingManager;
import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
import java.util.List;
import java.util.Properties;
@@ -24,10 +20,7 @@ import java.util.Properties;
*/
public class KafkaDataLoader {
- public static void loadIntoKafka(String streamName, List<String> messages) {
-
- StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
- StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streamName);
+ public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
@@ -46,20 +39,13 @@ public class KafkaDataLoader {
Producer<String, String> producer = new Producer<String, String>(config);
+ List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
for (int i = 0; i < messages.size(); ++i) {
- KeyedMessage<String, String> data = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
- producer.send(data);
+ KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
+ keyedMessages.add(keyedMessage);
}
+ producer.send(keyedMessages);
producer.close();
}
- /**
- *
- * @param args args[0] data file path, args[1] streaming name
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- List<String> alldata = FileUtils.readLines(new File(args[0]));
- loadIntoKafka(args[1], alldata);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 07f2b05..ae6b282 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -269,7 +269,6 @@ public class StreamingBootstrap {
OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
logger.info("one off build finished");
- System.exit(0);
}
private void startIIStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
index a39b273..196381e 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
@@ -18,20 +18,6 @@
package org.apache.kylin.job.tools;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -45,15 +31,25 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
/**
* @author yangli9
@@ -235,8 +231,7 @@ public class DeployCoprocessorCLI {
}
private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
- return fileStatus.getLen() == localCoprocessorFile.length() //
- && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
+ return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
}
private static String getBaseFileName(String localCoprocessorJar) {
@@ -305,7 +300,7 @@ public class DeployCoprocessorCLI {
}
for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
- if(ii.getStatus()== RealizationStatusEnum.READY) {
+ if (ii.getStatus() == RealizationStatusEnum.READY) {
for (IISegment seg : ii.getSegments()) {//streaming segment is never "READY"
String tableName = seg.getStorageLocationIdentifier();
if (StringUtils.isBlank(tableName) == false) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 9a83154..54f5328 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -26,8 +26,8 @@ import org.apache.kylin.common.lock.ZookeeperJobLock;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.cube.CubingJob;
@@ -118,7 +118,7 @@ public class BuildCubeWithEngineTest {
@Test
public void test() throws Exception {
- DeployUtil.prepareTestData("left", "test_kylin_cube_with_slr_left_join_empty");
+ DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty");
testInner();
testLeft();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index 8defe1a..7a99661 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -40,9 +40,7 @@ import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.job.streaming.BootstrapConfig;
-import org.apache.kylin.job.streaming.KafkaDataLoader;
import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
import org.apache.kylin.streaming.StreamingConfig;
import org.apache.kylin.streaming.StreamingManager;
import org.junit.After;
@@ -53,7 +51,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.util.List;
import java.util.UUID;
/**
@@ -64,8 +61,8 @@ public class BuildCubeWithStreamTest {
private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamTest.class);
private static final String streamingName = "test_streaming_table_cube";
private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00");
- private static final long endTime = DateFormat.stringToMillis("2015-11-01 00:00:00");
- private static final long batchInterval = 12 * 60 * 60 * 1000;//12 hours
+ private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00");
+ private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours
private KylinConfig kylinConfig;
@@ -89,13 +86,7 @@ public class BuildCubeWithStreamTest {
streamingConfig.setTopic(UUID.randomUUID().toString());
StreamingManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
- loadDataIntoKafka();
- }
-
- private void loadDataIntoKafka() {
- //10 day's data,sorted
- List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime);
- KafkaDataLoader.loadIntoKafka(streamingName, data);
+ DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, streamingConfig);
}
@After
@@ -108,7 +99,7 @@ public class BuildCubeWithStreamTest {
for (long start = startTime; start < endTime; start += batchInterval) {
BootstrapConfig bootstrapConfig = new BootstrapConfig();
bootstrapConfig.setStart(start);
- bootstrapConfig.setEnd(start + endTime);
+ bootstrapConfig.setEnd(start + batchInterval);
bootstrapConfig.setMargin(0);
bootstrapConfig.setOneOff(true);
bootstrapConfig.setPartitionId(0);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/DataGenTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DataGenTest.java b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
index 7161f59..112b681 100644
--- a/job/src/test/java/org/apache/kylin/job/DataGenTest.java
+++ b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
@@ -18,30 +18,17 @@
package org.apache.kylin.job;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.base.Function;
-import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.SortUtil;
import org.apache.kylin.job.dataGen.FactTableGenerator;
-import org.apache.kylin.job.dataGen.StreamingDataGenerator;
import org.apache.kylin.metadata.MetadataManager;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
import static org.junit.Assert.assertTrue;
/**
+ *
*/
public class DataGenTest extends LocalFileMetadataTestCase {
@@ -58,7 +45,7 @@ public class DataGenTest extends LocalFileMetadataTestCase {
@Test
public void testBasics() throws Exception {
- String content = FactTableGenerator.generate("test_kylin_cube_with_slr_ready", "10000", "1", null, "inner");// default settings
+ String content = FactTableGenerator.generate("test_kylin_cube_with_slr_ready", "10000", "1",null);// default settings
System.out.println(content);
assertTrue(content.contains("FP-non GTC"));
assertTrue(content.contains("ABIN"));
@@ -66,43 +53,4 @@ public class DataGenTest extends LocalFileMetadataTestCase {
DeployUtil.overrideFactTableData(content, "default.test_kylin_fact");
}
- @Test
- public void testStreaming() throws Exception {
- int totalCount = 10000;
- int counter = 0;
-
- Iterator<String> iterator = StreamingDataGenerator.generate(DateFormat.stringToMillis("2015-01-03"), DateFormat.stringToMillis("2015-02-05"), totalCount);
-
- iterator = SortUtil.extractAndSort(iterator, new Function<String, Comparable>() {
- public Comparable apply(String input) {
- return getTsStr(input);
- }
- });
-
- //FileUtils.writeLines(new File("//Users/honma/streaming_table_data"),Lists.newArrayList(iterator));
-
- long lastTs = 0;
- while (iterator.hasNext()) {
- counter++;
- String row = iterator.next();
- System.out.println(row);
- long ts = Long.parseLong(getTsStr(row));
- Assert.assertTrue(ts >= lastTs);
- lastTs = ts;
- }
- Assert.assertEquals(totalCount, counter);
- }
-
- final JavaType javaType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
- final ObjectMapper objectMapper = new ObjectMapper();
-
- private String getTsStr(String input) {
- Map<String, String> json ;
- try {
- json = objectMapper.readValue(input.getBytes(), javaType);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return json.get("ts");
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java b/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
new file mode 100644
index 0000000..276cf67
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.*;
+
+import java.io.File;
+
+/**
+ * This test case is ONLY for dev use, it deploys local meta to sandbox
+ */
+@Ignore("dev use only")
+public class DeployLocalMetaToRemoteTest {
+
+ private static final Log logger = LogFactory.getLog(DeployLocalMetaToRemoteTest.class);
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
+ }
+
+ @Before
+ public void before() throws Exception {
+ HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+ DeployUtil.initCliWorkDir();
+ DeployUtil.deployMetadata();
+ DeployUtil.overrideJobJarLocations();
+
+ }
+
+ @After
+ public void after() {
+ HBaseMetadataTestCase.staticCleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws Exception {
+ System.out.println("blank");
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
index cff78dc..e80df58 100644
--- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,6 +18,7 @@
package org.apache.kylin.job;
+import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
@@ -33,15 +34,22 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.dataGen.FactTableGenerator;
import org.apache.kylin.job.hadoop.hive.SqlHiveDataTypeMapping;
+import org.apache.kylin.job.streaming.KafkaDataLoader;
+import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.StreamingConfig;
+import org.apache.kylin.streaming.TimedJsonStreamParser;
import org.apache.maven.model.Model;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
+import java.util.List;
public class DeployUtil {
@SuppressWarnings("unused")
@@ -138,7 +146,7 @@ public class DeployUtil {
static final String[] TABLE_NAMES = new String[] { TABLE_CAL_DT, TABLE_CATEGORY_GROUPINGS, TABLE_KYLIN_FACT, TABLE_SELLER_TYPE_DIM, TABLE_SITES };
- public static void prepareTestData(String joinType, String cubeName) throws Exception {
+ public static void prepareTestDataForNormalCubes(String cubeName) throws Exception {
String factTableName = TABLE_KYLIN_FACT.toUpperCase();
String content = null;
@@ -147,24 +155,40 @@ public class DeployUtil {
if (!buildCubeUsingProvidedData) {
System.out.println("build cube with random dataset");
// data is generated according to cube descriptor and saved in resource store
- if (joinType.equalsIgnoreCase("inner")) {
- content = FactTableGenerator.generate(cubeName, "10000", "1", null, "inner");
- } else if (joinType.equalsIgnoreCase("left")) {
- content = FactTableGenerator.generate(cubeName, "10000", "0.6", null, "left");
- } else {
- throw new IllegalArgumentException("Unsupported join type : " + joinType);
- }
-
+ content = FactTableGenerator.generate(cubeName, "10000", "0.6", null);
assert content != null;
overrideFactTableData(content, factTableName);
} else {
- System.out.println("build cube with provided dataset");
+ System.out.println("build normal cubes with provided dataset");
}
- duplicateFactTableData(factTableName, joinType);
deployHiveTables();
}
+ public static void prepareTestDataForStreamingCube(long startTime, long endTime, StreamingConfig streamingConfig) throws IOException {
+ MetadataManager metadataManager = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(streamingConfig.getCubeName());
+ List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
+ TableDesc tableDesc = metadataManager.getTableDesc(cubeInstance.getFactTable());
+
+ //load into kafka
+ KafkaDataLoader.loadIntoKafka(streamingConfig, data);
+
+ //csv data for H2 use
+ List<TblColRef> tableColumns = Lists.newArrayList();
+ for (ColumnDesc columnDesc : tableDesc.getColumns()) {
+ tableColumns.add(new TblColRef(columnDesc));
+ }
+ TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, true);
+ StringBuilder sb = new StringBuilder();
+ for (String json : data) {
+ List<String> rowColumns = timedJsonStreamParser.parse(new StreamMessage(0, json.getBytes())).getStreamMessage();
+ sb.append(StringUtils.join(rowColumns, ","));
+ sb.append(System.getProperty("line.separator"));
+ }
+ overrideFactTableData(sb.toString(), cubeInstance.getFactTable());
+ }
+
public static void overrideFactTableData(String factTableContent, String factTableName) throws IOException {
// Write to resource store
ResourceStore store = ResourceStore.getStore(config());
@@ -176,17 +200,6 @@ public class DeployUtil {
in.close();
}
- public static void duplicateFactTableData(String factTableName, String joinType) throws IOException {
- // duplicate a copy of this fact table, with a naming convention with fact.csv.inner or fact.csv.left
- // so that later test cases can select different data files
- ResourceStore store = ResourceStore.getStore(config());
- InputStream in = store.getResource("/data/" + factTableName + ".csv");
- String factTablePathWithJoinType = "/data/" + factTableName + ".csv." + joinType.toLowerCase();
- store.deleteResource(factTablePathWithJoinType);
- store.putResource(factTablePathWithJoinType, in, System.currentTimeMillis());
- in.close();
- }
-
private static void deployHiveTables() throws Exception {
MetadataManager metaMgr = MetadataManager.getInstance(config());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index 5827946..995d314 100644
--- a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -603,7 +603,7 @@ public class FactTableGenerator {
* lookup table by INNER join
* @param randomSeed random seed
*/
- public static String generate(String cubeName, String rowCount, String linkableRatio, String randomSeed, String joinType) throws Exception {
+ public static String generate(String cubeName, String rowCount, String linkableRatio, String randomSeed) throws Exception {
if (rowCount == null)
rowCount = "10000";
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
index c303ebf..ed2f3cc 100644
--- a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
@@ -19,6 +19,7 @@ import java.util.Map;
import java.util.Random;
/**
+ * data gen for II streaming, may be merged with StreamingTableDataGenerator
*/
public class StreamingDataGenerator {
private static final Logger logger = LoggerFactory.getLogger(StreamingDataGenerator.class);
@@ -27,7 +28,7 @@ public class StreamingDataGenerator {
public static Iterator<String> generate(final long start, final long end, final int count) {
final KylinConfig config = KylinConfig.getInstanceFromEnv();
- final IIInstance ii = IIManager.getInstance(config).getII("test_streaming_table");
+ final IIInstance ii = IIManager.getInstance(config).getII("test_streaming_table_ii");
final IIDesc iiDesc = ii.getDescriptor();
final List<TblColRef> columns = iiDesc.listAllColumns();
@@ -44,9 +45,9 @@ public class StreamingDataGenerator {
public String next() {
values.clear();
long ts = this.createTs(start, end);
- values.put("ts", Long.toString(ts));
values.put("minute_start", Long.toString(TimeUtil.getMinuteStart(ts)));
values.put("hour_start", Long.toString(TimeUtil.getHourStart(ts)));
+ values.put("day_start",Long.toString(TimeUtil.getDayStart(ts)));
values.put("itm", Integer.toString(random.nextInt(20)));
values.put("site", Integer.toString(random.nextInt(5)));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
index 394b56f..0a93469 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
@@ -20,19 +20,19 @@ import java.util.List;
import java.util.Random;
/**
- * this is for generating fact table data for test_streaming_table
+ * this is for generating fact table data for test_streaming_table (cube streaming)
*/
public class StreamingTableDataGenerator {
private static final Logger logger = LoggerFactory.getLogger(StreamingTableDataGenerator.class);
private static final ObjectMapper mapper = new ObjectMapper();
- public static List<String> generate(int recordCount, long startTime, long endTime) {
+ public static List<String> generate(int recordCount, long startTime, long endTime, String tableName) {
Preconditions.checkArgument(startTime < endTime);
Preconditions.checkArgument(recordCount > 0);
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc("streaming_table");
+ TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
SortedMultiset<Long> times = TreeMultiset.create();
Random r = new Random();
@@ -47,24 +47,26 @@ public class StreamingTableDataGenerator {
kvs.clear();
kvs.put("timestamp", String.valueOf(time));
for (ColumnDesc columnDesc : tableDesc.getColumns()) {
+ String lowerCaseColumnName = columnDesc.getName().toLowerCase();
DataType dataType = columnDesc.getType();
if (dataType.isDateTimeFamily()) {
+ //TimedJsonStreamParser will derived minute_start,hour_start,day_start from timestamp
continue;
} else if (dataType.isStringFamily()) {
char c = (char) ('A' + (int) (26 * r.nextDouble()));
- kvs.put(columnDesc.getName(), String.valueOf(c));
+ kvs.put(lowerCaseColumnName, String.valueOf(c));
} else if (dataType.isIntegerFamily()) {
int v = r.nextInt(10000);
- kvs.put(columnDesc.getName(), String.valueOf(v));
+ kvs.put(lowerCaseColumnName, String.valueOf(v));
} else if (dataType.isNumberFamily()) {
String v = String.format("%.4f", r.nextDouble() * 100);
- kvs.put(columnDesc.getName(), v);
+ kvs.put(lowerCaseColumnName, v);
}
}
try {
ret.add(mapper.writeValueAsString(kvs));
} catch (JsonProcessingException e) {
- logger.error("error!",e);
+ logger.error("error!", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/java/org/apache/kylin/query/test/H2Database.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/H2Database.java b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
index ed0c13f..c807bfe 100644
--- a/query/src/test/java/org/apache/kylin/query/test/H2Database.java
+++ b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
@@ -18,6 +18,13 @@
package org.apache.kylin.query.test;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -28,18 +35,10 @@ import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-
public class H2Database {
private static final Logger logger = LoggerFactory.getLogger(H2Database.class);
- private static final String[] ALL_TABLES = new String[] { "edw.test_cal_dt", "default.test_category_groupings", "default.test_kylin_fact", "edw.test_seller_type_dim", "edw.test_sites" };
+ private static final String[] ALL_TABLES = new String[] { "edw.test_cal_dt", "default.test_category_groupings", "default.test_kylin_fact", "edw.test_seller_type_dim", "edw.test_sites", "default.streaming_table" };
private static final Map<String, String> javaToH2DataTypeMapping = new HashMap<String, String>();
static {
@@ -58,33 +57,22 @@ public class H2Database {
this.config = config;
}
- public void loadAllTables(String joinType) throws SQLException {
+ public void loadAllTables() throws SQLException {
for (String tableName : ALL_TABLES) {
- loadH2Table(tableName, joinType);
+ loadH2Table(tableName);
}
}
- private void loadH2Table(String tableName, String joinType) throws SQLException {
+ private void loadH2Table(String tableName) throws SQLException {
MetadataManager metaMgr = MetadataManager.getInstance(config);
TableDesc tableDesc = metaMgr.getTableDesc(tableName.toUpperCase());
File tempFile = null;
- String fileNameSuffix = joinType.equalsIgnoreCase("default") ? "" : "." + joinType;
-
try {
tempFile = File.createTempFile("tmp_h2", ".csv");
FileOutputStream tempFileStream = new FileOutputStream(tempFile);
String normalPath = "/data/" + tableDesc.getIdentity() + ".csv";
-
- // If it's the fact table, there will be a facttable.csv.inner or
- // facttable.csv.left in hbase
- // otherwise just use lookup.csv
- InputStream csvStream = metaMgr.getStore().getResource(normalPath + fileNameSuffix);
- if (csvStream == null) {
- csvStream = metaMgr.getStore().getResource(normalPath);
- } else {
- logger.info("H2 decides to load " + (normalPath + fileNameSuffix) + " for table " + tableDesc.getIdentity());
- }
+ InputStream csvStream = metaMgr.getStore().getResource(normalPath);
org.apache.commons.io.IOUtils.copy(csvStream, tempFileStream);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
index 5e14471..a595dc9 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
@@ -73,7 +73,7 @@ public class ITKylinQueryTest extends KylinTestBase {
h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", "");
// Load H2 Tables (inner join)
H2Database h2DB = new H2Database(h2Connection, config);
- h2DB.loadAllTables(joinType);
+ h2DB.loadAllTables();
}
protected static void clean() {
@@ -95,7 +95,7 @@ public class ITKylinQueryTest extends KylinTestBase {
@Test
public void testSingleRunQuery() throws Exception {
- String queryFileName = "src/test/resources/query/sql/query00.sql";
+ String queryFileName = "src/test/resources/query/sql_streaming/query07.sql";
File sqlFile = new File(queryFileName);
if (sqlFile.exists()) {
@@ -163,6 +163,11 @@ public class ITKylinQueryTest extends KylinTestBase {
}
@Test
+ public void testStreamingTableQuery() throws Exception {
+ execAndCompQuery("src/test/resources/query/sql_streaming",null,true);
+ }
+
+ @Test
public void testTableauQuery() throws Exception {
batchExecuteQuery("src/test/resources/query/sql_tableau");
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query01.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query01.sql b/query/src/test/resources/query/sql_streaming/query01.sql
new file mode 100644
index 0000000..8a5f302
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query01.sql
@@ -0,0 +1 @@
+select count(*) AS c from streaming_table
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query02.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query02.sql b/query/src/test/resources/query/sql_streaming/query02.sql
new file mode 100644
index 0000000..0fc0f88
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query02.sql
@@ -0,0 +1 @@
+select count(*) AS c ,cast(sum(item_count) as BIGINT) as i from streaming_table group by site,day_start
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query03.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query03.sql b/query/src/test/resources/query/sql_streaming/query03.sql
new file mode 100644
index 0000000..33ab8cb
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query03.sql
@@ -0,0 +1 @@
+select count(*) as c ,hour_start AS h from streaming_table group by hour_start
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query04.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query04.sql b/query/src/test/resources/query/sql_streaming/query04.sql
new file mode 100644
index 0000000..5117c37
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query04.sql
@@ -0,0 +1 @@
+select count(*) as c,sum(gmv) as g,cast(sum(item_count) as BIGINT) AS i from streaming_table group by minute_start
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query05.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query05.sql b/query/src/test/resources/query/sql_streaming/query05.sql
new file mode 100644
index 0000000..566046a
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query05.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where day_start >= DATE'2015-01-02'
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query06.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query06.sql b/query/src/test/resources/query/sql_streaming/query06.sql
new file mode 100644
index 0000000..ceabc91
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query06.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where minute_start >= {TS '2015-01-02 20:00:00'}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query07.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query07.sql b/query/src/test/resources/query/sql_streaming/query07.sql
new file mode 100644
index 0000000..b09759d
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query07.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where minute_start < {TS '2015-01-02 21:00:00'}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query08.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query08.sql b/query/src/test/resources/query/sql_streaming/query08.sql
new file mode 100644
index 0000000..120ef50
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query08.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where minute_start < {TS '2015-01-02 21:00:00'} and minute_start > {TS '2015-01-02 20:00:00'}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query09.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query09.sql b/query/src/test/resources/query/sql_streaming/query09.sql
new file mode 100644
index 0000000..0d29d92
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query09.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where minute_start >= {TS '2015-01-02 20:30:00'}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query10.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query10.sql b/query/src/test/resources/query/sql_streaming/query10.sql
new file mode 100644
index 0000000..c92add2
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query10.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where minute_start >= {TS '2015-01-01 20:30:00'}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
index e683fbc..6d77fa2 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
@@ -18,15 +18,12 @@
package org.apache.kylin.storage.hbase;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
@@ -35,10 +32,8 @@ import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.common.util.DateFormat;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.*;
/**
*
@@ -135,10 +130,10 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
private void initPartitionRange(ColumnValueRange dimRange) {
if (null != dimRange.getBeginValue()) {
- this.partitionColumnStartDate = DateFormat.stringToDate(dimRange.getBeginValue()).getTime();
+ this.partitionColumnStartDate = DateFormat.stringToMillis(dimRange.getBeginValue());
}
if (null != dimRange.getEndValue()) {
- this.partitionColumnEndDate = DateFormat.stringToDate(dimRange.getEndValue()).getTime();
+ this.partitionColumnEndDate = DateFormat.stringToMillis(dimRange.getEndValue());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 2c1e728..6086a55 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -69,7 +69,7 @@ public final class StreamingUtil {
final String topic = kafkaClusterConfig.getTopic();
final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
final long earliestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
- final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
+ final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig) - 1;
logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, earliestOffset, latestOffset));
final long result = binarySearch(kafkaClusterConfig, partitionId, earliestOffset, latestOffset, timestamp, streamParser);
logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, partitionId, result));
@@ -83,8 +83,8 @@ public final class StreamingUtil {
long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamParser);
long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamParser);
// hard to ensure these 2 conditions
-// Preconditions.checkArgument(startTimestamp <= midTimestamp);
-// Preconditions.checkArgument(midTimestamp <= endTimestamp);
+ // Preconditions.checkArgument(startTimestamp <= midTimestamp);
+ // Preconditions.checkArgument(midTimestamp <= endTimestamp);
if (startTimestamp >= targetTimestamp) {
return startOffset;
}
@@ -103,6 +103,5 @@ public final class StreamingUtil {
}
return startOffset;
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
new file mode 100644
index 0000000..cba52e8
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
@@ -0,0 +1,113 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * each json message with a "timestamp" field
+ */
+public final class TimedJsonStreamParser implements StreamParser {
+
+ private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
+
+ private final List<TblColRef> allColumns;
+ private boolean formatTs;
+ private final ObjectMapper mapper = new ObjectMapper();
+ private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+ public TimedJsonStreamParser(List<TblColRef> allColumns) {
+ this.allColumns = allColumns;
+ this.formatTs = false;
+ }
+
+ public TimedJsonStreamParser(List<TblColRef> allColumns, boolean formatTs) {
+ this.allColumns = allColumns;
+ this.formatTs = formatTs;
+ }
+
+ @Override
+ public ParsedStreamMessage parse(StreamMessage stream) {
+ try {
+ Map<String, String> root = mapper.readValue(stream.getRawData(), mapType);
+ String tsStr = root.get("timestamp");
+ Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field cannot be null");
+ long t = Long.valueOf(root.get("timestamp"));
+ ArrayList<String> result = Lists.newArrayList();
+
+ for (TblColRef column : allColumns) {
+ String columnName = column.getName();
+ if (columnName.equalsIgnoreCase("minute_start")) {
+ long minuteStart = TimeUtil.getMinuteStart(t);
+ result.add(formatTs ? DateFormat.formatToTimeStr(minuteStart) : String.valueOf(minuteStart));
+ } else if (columnName.equalsIgnoreCase("hour_start")) {
+ long hourStart = TimeUtil.getHourStart(t);
+ result.add(formatTs ? DateFormat.formatToTimeStr(hourStart) : String.valueOf(hourStart));
+ } else if (columnName.equalsIgnoreCase("day_start")) {
+ //of day start we'll add yyyy-mm-dd
+ long ts = TimeUtil.getDayStart(t);
+ result.add(DateFormat.formatToDateStr(ts));
+ } else {
+ String x = root.get(columnName.toLowerCase());
+ result.add(x);
+ }
+ }
+
+ return new ParsedStreamMessage(result, stream.getOffset(), t, true);
+
+ } catch (IOException e) {
+ logger.error("error", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+}
[2/2] incubator-kylin git commit: Merge branch 'origin/0.8'
Posted by ma...@apache.org.
Merge branch 'origin/0.8'
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/49e722ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/49e722ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/49e722ec
Branch: refs/heads/0.8
Commit: 49e722ec284a31afa79ee364c892be14f3459ff7
Parents: 8c13aed 22cdfa2
Author: honma <ho...@ebay.com>
Authored: Tue Jun 16 19:11:10 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 16 19:11:10 2015 +0800
----------------------------------------------------------------------
bin/check-sandbox-properties.sh | 9 +++++++++
bin/kylin.sh | 4 ++--
.../main/java/org/apache/kylin/dict/DictionaryManager.java | 3 ++-
3 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------