You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/16 13:14:41 UTC

[1/2] incubator-kylin git commit: KYLIN-826 streaming table case build and verify result in KylinQueryTest

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 22cdfa2a8 -> 49e722ec2


KYLIN-826 streaming table case build and verify result in KylinQueryTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8c13aed0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8c13aed0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8c13aed0

Branch: refs/heads/0.8
Commit: 8c13aed0a92e21a399d08ed96c74e03e8dac5abe
Parents: c48962d
Author: honma <ho...@ebay.com>
Authored: Tue Jun 16 19:09:58 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 16 19:09:58 2015 +0800

----------------------------------------------------------------------
 .../cube/test_streaming_table_cube.json         |   2 +-
 .../test_streaming_table_ii_desc.json           |   3 +-
 .../streaming/test_streaming_table_cube.json    |   6 +-
 .../localmeta/table/DEFAULT.SREAMING_TABLE.json |   2 +-
 .../kylin/job/streaming/KafkaDataLoader.java    |  26 +----
 .../kylin/job/streaming/StreamingBootstrap.java |   1 -
 .../kylin/job/tools/DeployCoprocessorCLI.java   |  37 +++---
 .../kylin/job/BuildCubeWithEngineTest.java      |   4 +-
 .../kylin/job/BuildCubeWithStreamTest.java      |  17 +--
 .../java/org/apache/kylin/job/DataGenTest.java  |  56 +--------
 .../kylin/job/DeployLocalMetaToRemoteTest.java  |  67 +++++++++++
 .../java/org/apache/kylin/job/DeployUtil.java   |  57 ++++++----
 .../kylin/job/dataGen/FactTableGenerator.java   |   2 +-
 .../job/dataGen/StreamingDataGenerator.java     |   5 +-
 .../streaming/StreamingTableDataGenerator.java  |  16 +--
 .../org/apache/kylin/query/test/H2Database.java |  36 ++----
 .../kylin/query/test/ITKylinQueryTest.java      |   9 +-
 .../resources/query/sql_streaming/query01.sql   |   1 +
 .../resources/query/sql_streaming/query02.sql   |   1 +
 .../resources/query/sql_streaming/query03.sql   |   1 +
 .../resources/query/sql_streaming/query04.sql   |   1 +
 .../resources/query/sql_streaming/query05.sql   |   1 +
 .../resources/query/sql_streaming/query06.sql   |   1 +
 .../resources/query/sql_streaming/query07.sql   |   1 +
 .../resources/query/sql_streaming/query08.sql   |   1 +
 .../resources/query/sql_streaming/query09.sql   |   1 +
 .../resources/query/sql_streaming/query10.sql   |   1 +
 .../kylin/storage/hbase/HBaseKeyRange.java      |  19 ++--
 .../apache/kylin/streaming/StreamingUtil.java   |   7 +-
 .../kylin/streaming/TimedJsonStreamParser.java  | 113 +++++++++++++++++++
 30 files changed, 303 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/examples/test_case_data/localmeta/cube/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube/test_streaming_table_cube.json b/examples/test_case_data/localmeta/cube/test_streaming_table_cube.json
index 75fc09d..d0cf0d5 100644
--- a/examples/test_case_data/localmeta/cube/test_streaming_table_cube.json
+++ b/examples/test_case_data/localmeta/cube/test_streaming_table_cube.json
@@ -7,7 +7,7 @@
   "status": "READY",
   "segments": [],
   "last_modified": 1433474940111,
-  "descriptor": "test_streaming_table_cube",
+  "descriptor": "test_streaming_table_cube_desc",
   "create_time_utc": 1433343833458,
   "size_kb": 0,
   "input_records_count": 0,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/examples/test_case_data/localmeta/invertedindex_desc/test_streaming_table_ii_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex_desc/test_streaming_table_ii_desc.json b/examples/test_case_data/localmeta/invertedindex_desc/test_streaming_table_ii_desc.json
index 7948d3a..fe4d885 100644
--- a/examples/test_case_data/localmeta/invertedindex_desc/test_streaming_table_ii_desc.json
+++ b/examples/test_case_data/localmeta/invertedindex_desc/test_streaming_table_ii_desc.json
@@ -3,12 +3,11 @@
   "last_modified": 0,
   "name": "test_streaming_table_ii_desc",
   "model_name": "test_streaming_table_model_desc",
-  "timestamp_dimension": "ts",
+  "timestamp_dimension": "minute_start",
   "value_dimensions": [
     {
       "table": "default.streaming_table",
       "columns": [
-        "ts",
         "minute_start",
         "hour_start",
         "day_start",

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
index c11bc4f..98b4218 100644
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
+++ b/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
@@ -1,12 +1,12 @@
 {
   "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
-  "name": "test_streaming_table_ii",
+  "name": "test_streaming_table_cube",
   "topic": "test_streaming_table_topic_xyz",
   "timeout": 60000,
   "maxReadCount": 1000,
   "bufferSize": 65536,
-  "iiName": "test_streaming_table_ii",
-  "parserName": "org.apache.kylin.streaming.JsonStreamParser",
+  "cubeName": "test_streaming_table_cube",
+  "parserName": "org.apache.kylin.streaming.TimedJsonStreamParser",
   "partition": 1,
   "last_modified": 0,
   "clusters": [

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/examples/test_case_data/localmeta/table/DEFAULT.SREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/table/DEFAULT.SREAMING_TABLE.json b/examples/test_case_data/localmeta/table/DEFAULT.SREAMING_TABLE.json
index c4c2884..3033d91 100644
--- a/examples/test_case_data/localmeta/table/DEFAULT.SREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/table/DEFAULT.SREAMING_TABLE.json
@@ -35,7 +35,7 @@
     {
       "id": "7",
       "name": "item_count",
-      "datatype": "bigint"
+      "datatype": "int"
     }
   ],
   "database": "DEFAULT",

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index da2c711..df5d39c 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -2,20 +2,16 @@ package org.apache.kylin.job.streaming;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.streaming.BrokerConfig;
 import org.apache.kylin.streaming.KafkaClusterConfig;
 import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.StreamingManager;
 
 import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 
@@ -24,10 +20,7 @@ import java.util.Properties;
  */
 public class KafkaDataLoader {
 
-    public static void loadIntoKafka(String streamName, List<String> messages) {
-
-        StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
-        StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streamName);
+    public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
 
         KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
         String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
@@ -46,20 +39,13 @@ public class KafkaDataLoader {
 
         Producer<String, String> producer = new Producer<String, String>(config);
 
+        List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
         for (int i = 0; i < messages.size(); ++i) {
-            KeyedMessage<String, String> data = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
-            producer.send(data);
+            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
+            keyedMessages.add(keyedMessage);
         }
+        producer.send(keyedMessages);
         producer.close();
     }
 
-    /**
-     *
-     * @param args args[0] data file path, args[1] streaming name
-     * @throws IOException
-     */
-    public static void main(String[] args) throws IOException {
-        List<String> alldata = FileUtils.readLines(new File(args[0]));
-        loadIntoKafka(args[1], alldata);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 07f2b05..ae6b282 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -269,7 +269,6 @@ public class StreamingBootstrap {
         OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
         Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
         logger.info("one off build finished");
-        System.exit(0);
     }
 
     private void startIIStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
index a39b273..196381e 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
@@ -18,20 +18,6 @@
 
 package org.apache.kylin.job.tools;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -45,15 +31,25 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
 
 /**
  * @author yangli9
@@ -235,8 +231,7 @@ public class DeployCoprocessorCLI {
     }
 
     private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
-        return fileStatus.getLen() == localCoprocessorFile.length() //
-                && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
+        return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
     }
 
     private static String getBaseFileName(String localCoprocessorJar) {
@@ -305,7 +300,7 @@ public class DeployCoprocessorCLI {
         }
 
         for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
-            if(ii.getStatus()== RealizationStatusEnum.READY) {
+            if (ii.getStatus() == RealizationStatusEnum.READY) {
                 for (IISegment seg : ii.getSegments()) {//streaming segment is never "READY"
                     String tableName = seg.getStorageLocationIdentifier();
                     if (StringUtils.isBlank(tableName) == false) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 9a83154..54f5328 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -26,8 +26,8 @@ import org.apache.kylin.common.lock.ZookeeperJobLock;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.cube.CubingJob;
@@ -118,7 +118,7 @@ public class BuildCubeWithEngineTest {
 
     @Test
     public void test() throws Exception {
-        DeployUtil.prepareTestData("left", "test_kylin_cube_with_slr_left_join_empty");
+        DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty");
         testInner();
         testLeft();
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index 8defe1a..7a99661 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -40,9 +40,7 @@ import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.job.streaming.BootstrapConfig;
-import org.apache.kylin.job.streaming.KafkaDataLoader;
 import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
 import org.apache.kylin.streaming.StreamingConfig;
 import org.apache.kylin.streaming.StreamingManager;
 import org.junit.After;
@@ -53,7 +51,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.List;
 import java.util.UUID;
 
 /**
@@ -64,8 +61,8 @@ public class BuildCubeWithStreamTest {
     private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamTest.class);
     private static final String streamingName = "test_streaming_table_cube";
     private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00");
-    private static final long endTime = DateFormat.stringToMillis("2015-11-01 00:00:00");
-    private static final long batchInterval = 12 * 60 * 60 * 1000;//12 hours
+    private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00");
+    private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours
 
     private KylinConfig kylinConfig;
 
@@ -89,13 +86,7 @@ public class BuildCubeWithStreamTest {
         streamingConfig.setTopic(UUID.randomUUID().toString());
         StreamingManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
 
-        loadDataIntoKafka();
-    }
-
-    private void loadDataIntoKafka() {
-        //10 day's data,sorted
-        List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime);
-        KafkaDataLoader.loadIntoKafka(streamingName, data);
+        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, streamingConfig);
     }
 
     @After
@@ -108,7 +99,7 @@ public class BuildCubeWithStreamTest {
         for (long start = startTime; start < endTime; start += batchInterval) {
             BootstrapConfig bootstrapConfig = new BootstrapConfig();
             bootstrapConfig.setStart(start);
-            bootstrapConfig.setEnd(start + endTime);
+            bootstrapConfig.setEnd(start + batchInterval);
             bootstrapConfig.setMargin(0);
             bootstrapConfig.setOneOff(true);
             bootstrapConfig.setPartitionId(0);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/DataGenTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DataGenTest.java b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
index 7161f59..112b681 100644
--- a/job/src/test/java/org/apache/kylin/job/DataGenTest.java
+++ b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
@@ -18,30 +18,17 @@
 
 package org.apache.kylin.job;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.base.Function;
-import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.SortUtil;
 import org.apache.kylin.job.dataGen.FactTableGenerator;
-import org.apache.kylin.job.dataGen.StreamingDataGenerator;
 import org.apache.kylin.metadata.MetadataManager;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
 import static org.junit.Assert.assertTrue;
 
 /**
+ *
  */
 public class DataGenTest extends LocalFileMetadataTestCase {
 
@@ -58,7 +45,7 @@ public class DataGenTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testBasics() throws Exception {
-        String content = FactTableGenerator.generate("test_kylin_cube_with_slr_ready", "10000", "1", null, "inner");// default  settings
+        String content = FactTableGenerator.generate("test_kylin_cube_with_slr_ready", "10000", "1",null);// default  settings
         System.out.println(content);
         assertTrue(content.contains("FP-non GTC"));
         assertTrue(content.contains("ABIN"));
@@ -66,43 +53,4 @@ public class DataGenTest extends LocalFileMetadataTestCase {
         DeployUtil.overrideFactTableData(content, "default.test_kylin_fact");
     }
 
-    @Test
-    public void testStreaming() throws Exception {
-        int totalCount = 10000;
-        int counter = 0;
-
-        Iterator<String> iterator = StreamingDataGenerator.generate(DateFormat.stringToMillis("2015-01-03"), DateFormat.stringToMillis("2015-02-05"), totalCount);
-
-        iterator = SortUtil.extractAndSort(iterator, new Function<String, Comparable>() {
-            public Comparable apply(String input) {
-                return getTsStr(input);
-            }
-        });
-
-        //FileUtils.writeLines(new File("//Users/honma/streaming_table_data"),Lists.newArrayList(iterator));
-
-        long lastTs = 0;
-        while (iterator.hasNext()) {
-            counter++;
-            String row = iterator.next();
-            System.out.println(row);
-            long ts = Long.parseLong(getTsStr(row));
-            Assert.assertTrue(ts >= lastTs);
-            lastTs = ts;
-        }
-        Assert.assertEquals(totalCount, counter);
-    }
-
-    final JavaType javaType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
-    final ObjectMapper objectMapper = new ObjectMapper();
-
-    private String getTsStr(String input) {
-        Map<String, String> json ;
-        try {
-            json = objectMapper.readValue(input.getBytes(), javaType);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return json.get("ts");
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java b/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
new file mode 100644
index 0000000..276cf67
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.*;
+
+import java.io.File;
+
+/**
+ * This test case is ONLY for dev use, it deploys local meta to sandbox
+ */
+@Ignore("dev use only")
+public class DeployLocalMetaToRemoteTest {
+
+    private static final Log logger = LogFactory.getLog(DeployLocalMetaToRemoteTest.class);
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+
+    }
+
+    @After
+    public void after() {
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+        System.out.println("blank");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
index cff78dc..e80df58 100644
--- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.job;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
@@ -33,15 +34,22 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.dataGen.FactTableGenerator;
 import org.apache.kylin.job.hadoop.hive.SqlHiveDataTypeMapping;
+import org.apache.kylin.job.streaming.KafkaDataLoader;
+import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.StreamingConfig;
+import org.apache.kylin.streaming.TimedJsonStreamParser;
 import org.apache.maven.model.Model;
 import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
+import java.util.List;
 
 public class DeployUtil {
     @SuppressWarnings("unused")
@@ -138,7 +146,7 @@ public class DeployUtil {
 
     static final String[] TABLE_NAMES = new String[] { TABLE_CAL_DT, TABLE_CATEGORY_GROUPINGS, TABLE_KYLIN_FACT, TABLE_SELLER_TYPE_DIM, TABLE_SITES };
 
-    public static void prepareTestData(String joinType, String cubeName) throws Exception {
+    public static void prepareTestDataForNormalCubes(String cubeName) throws Exception {
 
         String factTableName = TABLE_KYLIN_FACT.toUpperCase();
         String content = null;
@@ -147,24 +155,40 @@ public class DeployUtil {
         if (!buildCubeUsingProvidedData) {
             System.out.println("build cube with random dataset");
             // data is generated according to cube descriptor and saved in resource store
-            if (joinType.equalsIgnoreCase("inner")) {
-                content = FactTableGenerator.generate(cubeName, "10000", "1", null, "inner");
-            } else if (joinType.equalsIgnoreCase("left")) {
-                content = FactTableGenerator.generate(cubeName, "10000", "0.6", null, "left");
-            } else {
-                throw new IllegalArgumentException("Unsupported join type : " + joinType);
-            }
-
+            content = FactTableGenerator.generate(cubeName, "10000", "0.6", null);
             assert content != null;
             overrideFactTableData(content, factTableName);
         } else {
-            System.out.println("build cube with provided dataset");
+            System.out.println("build normal cubes with provided dataset");
         }
 
-        duplicateFactTableData(factTableName, joinType);
         deployHiveTables();
     }
 
+    public static void prepareTestDataForStreamingCube(long startTime, long endTime, StreamingConfig streamingConfig) throws IOException {
+        MetadataManager metadataManager = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(streamingConfig.getCubeName());
+        List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
+        TableDesc tableDesc = metadataManager.getTableDesc(cubeInstance.getFactTable());
+
+        //load into kafka
+        KafkaDataLoader.loadIntoKafka(streamingConfig, data);
+
+        //csv data for H2 use
+        List<TblColRef> tableColumns = Lists.newArrayList();
+        for (ColumnDesc columnDesc : tableDesc.getColumns()) {
+            tableColumns.add(new TblColRef(columnDesc));
+        }
+        TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, true);
+        StringBuilder sb = new StringBuilder();
+        for (String json : data) {
+            List<String> rowColumns = timedJsonStreamParser.parse(new StreamMessage(0, json.getBytes())).getStreamMessage();
+            sb.append(StringUtils.join(rowColumns, ","));
+            sb.append(System.getProperty("line.separator"));
+        }
+        overrideFactTableData(sb.toString(), cubeInstance.getFactTable());
+    }
+
     public static void overrideFactTableData(String factTableContent, String factTableName) throws IOException {
         // Write to resource store
         ResourceStore store = ResourceStore.getStore(config());
@@ -176,17 +200,6 @@ public class DeployUtil {
         in.close();
     }
 
-    public static void duplicateFactTableData(String factTableName, String joinType) throws IOException {
-        // duplicate a copy of this fact table, with a naming convention with fact.csv.inner or fact.csv.left
-        // so that later test cases can select different data files
-        ResourceStore store = ResourceStore.getStore(config());
-        InputStream in = store.getResource("/data/" + factTableName + ".csv");
-        String factTablePathWithJoinType = "/data/" + factTableName + ".csv." + joinType.toLowerCase();
-        store.deleteResource(factTablePathWithJoinType);
-        store.putResource(factTablePathWithJoinType, in, System.currentTimeMillis());
-        in.close();
-    }
-
     private static void deployHiveTables() throws Exception {
 
         MetadataManager metaMgr = MetadataManager.getInstance(config());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index 5827946..995d314 100644
--- a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -603,7 +603,7 @@ public class FactTableGenerator {
      *                      lookup table by INNER join
      * @param randomSeed    random seed
      */
-    public static String generate(String cubeName, String rowCount, String linkableRatio, String randomSeed, String joinType) throws Exception {
+    public static String generate(String cubeName, String rowCount, String linkableRatio, String randomSeed) throws Exception {
 
         if (rowCount == null)
             rowCount = "10000";

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
index c303ebf..ed2f3cc 100644
--- a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
@@ -19,6 +19,7 @@ import java.util.Map;
 import java.util.Random;
 
 /**
+ * data gen for II streaming, may be merged with StreamingTableDataGenerator
  */
 public class StreamingDataGenerator {
     private static final Logger logger = LoggerFactory.getLogger(StreamingDataGenerator.class);
@@ -27,7 +28,7 @@ public class StreamingDataGenerator {
 
     public static Iterator<String> generate(final long start, final long end, final int count) {
         final KylinConfig config = KylinConfig.getInstanceFromEnv();
-        final IIInstance ii = IIManager.getInstance(config).getII("test_streaming_table");
+        final IIInstance ii = IIManager.getInstance(config).getII("test_streaming_table_ii");
         final IIDesc iiDesc = ii.getDescriptor();
         final List<TblColRef> columns = iiDesc.listAllColumns();
 
@@ -44,9 +45,9 @@ public class StreamingDataGenerator {
             public String next() {
                 values.clear();
                 long ts = this.createTs(start, end);
-                values.put("ts", Long.toString(ts));
                 values.put("minute_start", Long.toString(TimeUtil.getMinuteStart(ts)));
                 values.put("hour_start", Long.toString(TimeUtil.getHourStart(ts)));
+                values.put("day_start",Long.toString(TimeUtil.getDayStart(ts)));
                 values.put("itm", Integer.toString(random.nextInt(20)));
                 values.put("site", Integer.toString(random.nextInt(5)));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
index 394b56f..0a93469 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
@@ -20,19 +20,19 @@ import java.util.List;
 import java.util.Random;
 
 /**
- * this is for generating fact table data for test_streaming_table
+ * this is for generating fact table data for test_streaming_table (cube streaming)
  */
 public class StreamingTableDataGenerator {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamingTableDataGenerator.class);
     private static final ObjectMapper mapper = new ObjectMapper();
 
-    public static List<String> generate(int recordCount, long startTime, long endTime)  {
+    public static List<String> generate(int recordCount, long startTime, long endTime, String tableName) {
         Preconditions.checkArgument(startTime < endTime);
         Preconditions.checkArgument(recordCount > 0);
 
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc("streaming_table");
+        TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
 
         SortedMultiset<Long> times = TreeMultiset.create();
         Random r = new Random();
@@ -47,24 +47,26 @@ public class StreamingTableDataGenerator {
             kvs.clear();
             kvs.put("timestamp", String.valueOf(time));
             for (ColumnDesc columnDesc : tableDesc.getColumns()) {
+                String lowerCaseColumnName = columnDesc.getName().toLowerCase();
                 DataType dataType = columnDesc.getType();
                 if (dataType.isDateTimeFamily()) {
+                    //TimedJsonStreamParser will derived minute_start,hour_start,day_start from timestamp
                     continue;
                 } else if (dataType.isStringFamily()) {
                     char c = (char) ('A' + (int) (26 * r.nextDouble()));
-                    kvs.put(columnDesc.getName(), String.valueOf(c));
+                    kvs.put(lowerCaseColumnName, String.valueOf(c));
                 } else if (dataType.isIntegerFamily()) {
                     int v = r.nextInt(10000);
-                    kvs.put(columnDesc.getName(), String.valueOf(v));
+                    kvs.put(lowerCaseColumnName, String.valueOf(v));
                 } else if (dataType.isNumberFamily()) {
                     String v = String.format("%.4f", r.nextDouble() * 100);
-                    kvs.put(columnDesc.getName(), v);
+                    kvs.put(lowerCaseColumnName, v);
                 }
             }
             try {
                 ret.add(mapper.writeValueAsString(kvs));
             } catch (JsonProcessingException e) {
-                logger.error("error!",e);
+                logger.error("error!", e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/java/org/apache/kylin/query/test/H2Database.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/H2Database.java b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
index ed0c13f..c807bfe 100644
--- a/query/src/test/java/org/apache/kylin/query/test/H2Database.java
+++ b/query/src/test/java/org/apache/kylin/query/test/H2Database.java
@@ -18,6 +18,13 @@
 
 package org.apache.kylin.query.test;
 
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -28,18 +35,10 @@ import java.sql.Statement;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-
 public class H2Database {
     private static final Logger logger = LoggerFactory.getLogger(H2Database.class);
 
-    private static final String[] ALL_TABLES = new String[] { "edw.test_cal_dt", "default.test_category_groupings", "default.test_kylin_fact", "edw.test_seller_type_dim", "edw.test_sites" };
+    private static final String[] ALL_TABLES = new String[] { "edw.test_cal_dt", "default.test_category_groupings", "default.test_kylin_fact", "edw.test_seller_type_dim", "edw.test_sites", "default.streaming_table" };
     private static final Map<String, String> javaToH2DataTypeMapping = new HashMap<String, String>();
 
     static {
@@ -58,33 +57,22 @@ public class H2Database {
         this.config = config;
     }
 
-    public void loadAllTables(String joinType) throws SQLException {
+    public void loadAllTables() throws SQLException {
         for (String tableName : ALL_TABLES) {
-            loadH2Table(tableName, joinType);
+            loadH2Table(tableName);
         }
     }
 
-    private void loadH2Table(String tableName, String joinType) throws SQLException {
+    private void loadH2Table(String tableName) throws SQLException {
         MetadataManager metaMgr = MetadataManager.getInstance(config);
         TableDesc tableDesc = metaMgr.getTableDesc(tableName.toUpperCase());
         File tempFile = null;
 
-        String fileNameSuffix = joinType.equalsIgnoreCase("default") ? "" : "." + joinType;
-
         try {
             tempFile = File.createTempFile("tmp_h2", ".csv");
             FileOutputStream tempFileStream = new FileOutputStream(tempFile);
             String normalPath = "/data/" + tableDesc.getIdentity() + ".csv";
-
-            // If it's the fact table, there will be a facttable.csv.inner or
-            // facttable.csv.left in hbase
-            // otherwise just use lookup.csv
-            InputStream csvStream = metaMgr.getStore().getResource(normalPath + fileNameSuffix);
-            if (csvStream == null) {
-                csvStream = metaMgr.getStore().getResource(normalPath);
-            } else {
-                logger.info("H2 decides to load " + (normalPath + fileNameSuffix) + " for table " + tableDesc.getIdentity());
-            }
+            InputStream csvStream = metaMgr.getStore().getResource(normalPath);
 
             org.apache.commons.io.IOUtils.copy(csvStream, tempFileStream);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
index 5e14471..a595dc9 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
@@ -73,7 +73,7 @@ public class ITKylinQueryTest extends KylinTestBase {
         h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", "");
         // Load H2 Tables (inner join)
         H2Database h2DB = new H2Database(h2Connection, config);
-        h2DB.loadAllTables(joinType);
+        h2DB.loadAllTables();
     }
 
     protected static void clean() {
@@ -95,7 +95,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleRunQuery() throws Exception {
 
-        String queryFileName = "src/test/resources/query/sql/query00.sql";
+        String queryFileName = "src/test/resources/query/sql_streaming/query07.sql";
 
         File sqlFile = new File(queryFileName);
         if (sqlFile.exists()) {
@@ -163,6 +163,11 @@ public class ITKylinQueryTest extends KylinTestBase {
     }
 
     @Test
+    public void testStreamingTableQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_streaming",null,true);
+    }
+
+    @Test
     public void testTableauQuery() throws Exception {
         batchExecuteQuery("src/test/resources/query/sql_tableau");
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query01.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query01.sql b/query/src/test/resources/query/sql_streaming/query01.sql
new file mode 100644
index 0000000..8a5f302
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query01.sql
@@ -0,0 +1 @@
+select count(*) AS c from streaming_table
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query02.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query02.sql b/query/src/test/resources/query/sql_streaming/query02.sql
new file mode 100644
index 0000000..0fc0f88
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query02.sql
@@ -0,0 +1 @@
+select count(*) AS c ,cast(sum(item_count) as BIGINT) as i from streaming_table group by site,day_start
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query03.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query03.sql b/query/src/test/resources/query/sql_streaming/query03.sql
new file mode 100644
index 0000000..33ab8cb
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query03.sql
@@ -0,0 +1 @@
+select count(*) as c ,hour_start AS h from streaming_table group by hour_start
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query04.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query04.sql b/query/src/test/resources/query/sql_streaming/query04.sql
new file mode 100644
index 0000000..5117c37
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query04.sql
@@ -0,0 +1 @@
+select count(*) as c,sum(gmv) as g,cast(sum(item_count) as BIGINT) AS i from streaming_table group by minute_start
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query05.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query05.sql b/query/src/test/resources/query/sql_streaming/query05.sql
new file mode 100644
index 0000000..566046a
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query05.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where day_start >= DATE'2015-01-02'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query06.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query06.sql b/query/src/test/resources/query/sql_streaming/query06.sql
new file mode 100644
index 0000000..ceabc91
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query06.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where minute_start >= {TS '2015-01-02 20:00:00'}  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query07.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query07.sql b/query/src/test/resources/query/sql_streaming/query07.sql
new file mode 100644
index 0000000..b09759d
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query07.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where minute_start < {TS '2015-01-02 21:00:00'}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query08.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query08.sql b/query/src/test/resources/query/sql_streaming/query08.sql
new file mode 100644
index 0000000..120ef50
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query08.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where minute_start < {TS '2015-01-02 21:00:00'} and minute_start > {TS '2015-01-02 20:00:00'}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query09.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query09.sql b/query/src/test/resources/query/sql_streaming/query09.sql
new file mode 100644
index 0000000..0d29d92
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query09.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where minute_start >= {TS '2015-01-02 20:30:00'}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/query/src/test/resources/query/sql_streaming/query10.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_streaming/query10.sql b/query/src/test/resources/query/sql_streaming/query10.sql
new file mode 100644
index 0000000..c92add2
--- /dev/null
+++ b/query/src/test/resources/query/sql_streaming/query10.sql
@@ -0,0 +1 @@
+select count(*) as c from streaming_table where minute_start >= {TS '2015-01-01 20:30:00'}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
index e683fbc..6d77fa2 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
@@ -18,15 +18,12 @@
 
 package org.apache.kylin.storage.hbase;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
@@ -35,10 +32,8 @@ import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.common.util.DateFormat;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.*;
 
 /**
  * 
@@ -135,10 +130,10 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
 
     private void initPartitionRange(ColumnValueRange dimRange) {
         if (null != dimRange.getBeginValue()) {
-            this.partitionColumnStartDate = DateFormat.stringToDate(dimRange.getBeginValue()).getTime();
+            this.partitionColumnStartDate = DateFormat.stringToMillis(dimRange.getBeginValue());
         }
         if (null != dimRange.getEndValue()) {
-            this.partitionColumnEndDate = DateFormat.stringToDate(dimRange.getEndValue()).getTime();
+            this.partitionColumnEndDate = DateFormat.stringToMillis(dimRange.getEndValue());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 2c1e728..6086a55 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -69,7 +69,7 @@ public final class StreamingUtil {
         final String topic = kafkaClusterConfig.getTopic();
         final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
         final long earliestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
-        final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
+        final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig) - 1;
         logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, earliestOffset, latestOffset));
         final long result = binarySearch(kafkaClusterConfig, partitionId, earliestOffset, latestOffset, timestamp, streamParser);
         logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, partitionId, result));
@@ -83,8 +83,8 @@ public final class StreamingUtil {
             long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamParser);
             long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamParser);
             // hard to ensure these 2 conditions
-//            Preconditions.checkArgument(startTimestamp <= midTimestamp);
-//            Preconditions.checkArgument(midTimestamp <= endTimestamp);
+            //            Preconditions.checkArgument(startTimestamp <= midTimestamp);
+            //            Preconditions.checkArgument(midTimestamp <= endTimestamp);
             if (startTimestamp >= targetTimestamp) {
                 return startOffset;
             }
@@ -103,6 +103,5 @@ public final class StreamingUtil {
         }
         return startOffset;
 
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8c13aed0/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
new file mode 100644
index 0000000..cba52e8
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
@@ -0,0 +1,113 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * each json message with a "timestamp" field
+ */
+public final class TimedJsonStreamParser implements StreamParser {
+
+    private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
+
+    private final List<TblColRef> allColumns;
+    private boolean formatTs;
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+    public TimedJsonStreamParser(List<TblColRef> allColumns) {
+        this.allColumns = allColumns;
+        this.formatTs = false;
+    }
+
+    public TimedJsonStreamParser(List<TblColRef> allColumns, boolean formatTs) {
+        this.allColumns = allColumns;
+        this.formatTs = formatTs;
+    }
+
+    @Override
+    public ParsedStreamMessage parse(StreamMessage stream) {
+        try {
+            Map<String, String> root = mapper.readValue(stream.getRawData(), mapType);
+            String tsStr = root.get("timestamp");
+            Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field cannot be null");
+            long t = Long.valueOf(root.get("timestamp"));
+            ArrayList<String> result = Lists.newArrayList();
+
+            for (TblColRef column : allColumns) {
+                String columnName = column.getName();
+                if (columnName.equalsIgnoreCase("minute_start")) {
+                    long minuteStart = TimeUtil.getMinuteStart(t);
+                    result.add(formatTs ? DateFormat.formatToTimeStr(minuteStart) : String.valueOf(minuteStart));
+                } else if (columnName.equalsIgnoreCase("hour_start")) {
+                    long hourStart = TimeUtil.getHourStart(t);
+                    result.add(formatTs ? DateFormat.formatToTimeStr(hourStart) : String.valueOf(hourStart));
+                } else if (columnName.equalsIgnoreCase("day_start")) {
+                    //of day start we'll add yyyy-mm-dd
+                    long ts = TimeUtil.getDayStart(t);
+                    result.add(DateFormat.formatToDateStr(ts));
+                } else {
+                    String x = root.get(columnName.toLowerCase());
+                    result.add(x);
+                }
+            }
+
+            return new ParsedStreamMessage(result, stream.getOffset(), t, true);
+
+        } catch (IOException e) {
+            logger.error("error", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+}


[2/2] incubator-kylin git commit: Merge branch 'origin/0.8'

Posted by ma...@apache.org.
Merge branch 'origin/0.8'


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/49e722ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/49e722ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/49e722ec

Branch: refs/heads/0.8
Commit: 49e722ec284a31afa79ee364c892be14f3459ff7
Parents: 8c13aed 22cdfa2
Author: honma <ho...@ebay.com>
Authored: Tue Jun 16 19:11:10 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 16 19:11:10 2015 +0800

----------------------------------------------------------------------
 bin/check-sandbox-properties.sh                             | 9 +++++++++
 bin/kylin.sh                                                | 4 ++--
 .../main/java/org/apache/kylin/dict/DictionaryManager.java  | 3 ++-
 3 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------