You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/09/23 05:29:15 UTC

[01/11] incubator-kylin git commit: minor, append kylin.log, don't overwrite

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-1011 0b9d2012f -> 919cb9951 (forced update)


minor, append kylin.log, don't overwrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4d05db96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4d05db96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4d05db96

Branch: refs/heads/KYLIN-1011
Commit: 4d05db96dae81e3d375376a28b3343d7eb5fde0d
Parents: db6febc
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Sep 22 10:13:34 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Sep 22 10:13:34 2015 +0800

----------------------------------------------------------------------
 build/bin/kylin.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d05db96/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 2e9cf24..b27864c 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -55,7 +55,7 @@ then
     -Dkylin.hive.dependency=${hive_dependency} \
     -Dkylin.hbase.dependency=${hbase_dependency} \
     -Dspring.profiles.active=${spring_profile} \
-    org.apache.hadoop.util.RunJar ${tomcat_root}/bin/bootstrap.jar  org.apache.catalina.startup.Bootstrap start > ${KYLIN_HOME}/logs/kylin.log 2>&1 & echo $! > ${KYLIN_HOME}/pid &
+    org.apache.hadoop.util.RunJar ${tomcat_root}/bin/bootstrap.jar  org.apache.catalina.startup.Bootstrap start >> ${KYLIN_HOME}/logs/kylin.log 2>&1 & echo $! > ${KYLIN_HOME}/pid &
     echo "A new Kylin instance is started by $USER, stop it using \"kylin.sh stop\""
     if [ "$useSandbox" = "true" ]
         then echo "Please visit http://<your_sandbox_ip>:7070/kylin to play with the cubes! (Useranme: ADMIN, Password: KYLIN)"
@@ -170,7 +170,7 @@ then
     -Dkylin.hive.dependency=${hive_dependency} \
     -Dkylin.hbase.dependency=${hbase_dependency} \
     -Dspring.profiles.active=${spring_profile} \
-    org.apache.kylin.job.monitor.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
+    org.apache.kylin.job.monitor.MonitorCLI $@ >> ${KYLIN_HOME}/logs/monitor.log 2>&1
     exit 0
 else
     echo "usage: kylin.sh start or kylin.sh stop"


[10/11] incubator-kylin git commit: refactor

Posted by qh...@apache.org.
refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b5621dc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b5621dc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b5621dc4

Branch: refs/heads/KYLIN-1011
Commit: b5621dc4d6fb1d5bcace85ce2226fb48201601c5
Parents: f5c55d7
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Sep 22 17:12:26 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Sep 22 17:29:32 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/BuildIIWithStreamTest.java | 41 +++++++++++++++++++-
 1 file changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b5621dc4/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 89be628..2e84a84 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -46,6 +46,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
@@ -58,8 +61,11 @@ import org.apache.kylin.engine.streaming.StreamingMessage;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
 import org.apache.kylin.invertedindex.streaming.SliceBuilder;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -68,6 +74,7 @@ import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.source.hive.HiveTableReader;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -182,6 +189,7 @@ public class BuildIIWithStreamTest {
             }
         }
         final IISegment segment = createSegment(iiName);
+        final HTableInterface htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier());
         String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
         ToolRunner.run(new IICreateHTableJob(), args);
 
@@ -195,12 +203,12 @@ public class BuildIIWithStreamTest {
             if (messages.size() >= iiDesc.getSliceSize()) {
                 messages.add(parse(row));
             } else {
-                sliceBuilder.buildSlice(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
+                build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
                 messages = Lists.newArrayList();
             }
         }
         if (!messages.isEmpty()) {
-            sliceBuilder.buildSlice(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
+            build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
         }
 
         reader.close();
@@ -219,6 +227,35 @@ public class BuildIIWithStreamTest {
             }
         }
     }
+    
+    private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) {
+        final Slice slice = sliceBuilder.buildSlice(batch);
+        try {
+            loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo()));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
+        List<Put> data = Lists.newArrayList();
+        for (IIRow row : codec.encodeKeyValue(slice)) {
+            final byte[] key = row.getKey().get();
+            final byte[] value = row.getValue().get();
+            Put put = new Put(key);
+            put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
+            final ImmutableBytesWritable dictionary = row.getDictionary();
+            final byte[] dictBytes = dictionary.get();
+            if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) {
+                put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
+            } else {
+                throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength());
+            }
+            data.add(put);
+        }
+        hTable.put(data);
+        //omit hTable.flushCommits(), because htable is auto flush
+    }
 
     private StreamingMessage parse(String[] row) {
         return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object>emptyMap());


[02/11] incubator-kylin git commit: KYLIN-972 clean ODBC driver code

Posted by qh...@apache.org.
KYLIN-972 clean ODBC driver code


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d7c37e0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d7c37e0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d7c37e0a

Branch: refs/heads/KYLIN-1011
Commit: d7c37e0a44aa815f94bc1187537a0adbaa1273d3
Parents: 4d05db9
Author: honma <ho...@ebay.com>
Authored: Tue Sep 22 10:59:56 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Sep 22 10:59:56 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BasicTest.java |   2 +-
 odbc/Common/Dump.cpp                            |  19 ++++++++++++
 odbc/Common/Dump.h                              |  19 ++++++++++++
 odbc/Common/Gzip.cpp                            |  20 +++++++++++++
 odbc/Common/Gzip.h                              |  19 ++++++++++++
 odbc/Common/JDBCODBC.cpp                        |  19 ++++++++++++
 odbc/Common/JDBCODBC.h                          |  19 ++++++++++++
 odbc/Common/JsonConverter.cpp                   |  19 ++++++++++++
 odbc/Common/JsonConverter.h                     |  18 +++++++++++
 odbc/Common/MsgTypes.h                          |  19 ++++++++++++
 odbc/Common/QueryCache.cpp                      |  19 ++++++++++++
 odbc/Common/QueryCache.h                        |  19 ++++++++++++
 odbc/Common/REST.cpp                            |  19 ++++++++++++
 odbc/Common/REST.h                              |  19 ++++++++++++
 odbc/Common/StringUtils.cpp                     |  19 ++++++++++++
 odbc/Common/StringUtils.h                       |  19 ++++++++++++
 odbc/Common/base64.cpp                          |  18 +++++++++++
 odbc/Common/base64.h                            |  18 +++++++++++
 odbc/Driver/KO_ALLOC.CPP                        |  18 +++++++++++
 odbc/Driver/KO_ATTR.CPP                         |  19 ++++++++++++
 odbc/Driver/KO_CONN.CPP                         |  19 ++++++++++++
 odbc/Driver/KO_CTLG.CPP                         |  19 ++++++++++++
 odbc/Driver/KO_Config.cpp                       |  19 ++++++++++++
 odbc/Driver/KO_DESC.CPP                         |  23 ++++++++++++--
 odbc/Driver/KO_DIAG.CPP                         |  18 +++++++++++
 odbc/Driver/KO_DTYPE.CPP                        |  18 +++++++++++
 odbc/Driver/KO_EXEC.CPP                         |  18 +++++++++++
 odbc/Driver/KO_FETCH.CPP                        |  20 ++++++++++++-
 odbc/Driver/KO_INFO.CPP                         |  30 +++++++++++++++----
 odbc/Driver/KO_PARAM.CPP                        |  18 +++++++++++
 odbc/Driver/KO_UTILS.CPP                        |  18 +++++++++++
 odbc/Driver/KylinODBC.CPP                       |  18 +++++++++++
 odbc/Driver/KylinODBC.H                         |  18 +++++++++++
 odbc/Driver/TypeConvertion.h                    |  18 +++++++++++
 odbc/Driver/resource.h                          |  18 +++++++++++
 odbc/Driver/stdafx.cpp                          |  18 +++++++++++
 odbc/Driver/stdafx.h                            |  18 +++++++++++
 odbc/TestDLL/ColorPrint.cpp                     |  18 +++++++++++
 odbc/TestDLL/ColorPrint.h                       |  18 +++++++++++
 odbc/TestDLL/CompareQueryTests.cpp              |  18 +++++++++++
 odbc/TestDLL/QueryFlowTest.cpp                  |  18 +++++++++++
 odbc/TestDLL/Report.cpp                         |  18 +++++++++++
 odbc/TestDLL/RestAPITest.cpp                    |  18 +++++++++++
 odbc/TestDLL/SimpleQueryTest.cpp                |  18 +++++++++++
 odbc/TestDLL/Source.cpp                         |  18 +++++++++++
 odbc/TestDLL/Tests.h                            |  18 +++++++++++
 odbc/doc/reference/0205agarwal.zip              | Bin 858699 -> 0 bytes
 ... driver based on REST service (chinese).docx | Bin 0 -> 681289 bytes
 ...61\345\212\250\347\250\213\345\272\217.docx" | Bin 681289 -> 0 bytes
 pom.xml                                         |  15 ++++++++++
 50 files changed, 855 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 6566a13..c3a9761 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.common.util;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/Dump.cpp
----------------------------------------------------------------------
diff --git a/odbc/Common/Dump.cpp b/odbc/Common/Dump.cpp
index e085861..c2301f7 100644
--- a/odbc/Common/Dump.cpp
+++ b/odbc/Common/Dump.cpp
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #include    <assert.h>
 #include    <stdio.h>
 #include    <ctype.h>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/Dump.h
----------------------------------------------------------------------
diff --git a/odbc/Common/Dump.h b/odbc/Common/Dump.h
index 36f7430..53b980c 100644
--- a/odbc/Common/Dump.h
+++ b/odbc/Common/Dump.h
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #pragma once
 
 void hexDump ( char* data, int lines, char* buffer, bool forward );
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/Gzip.cpp
----------------------------------------------------------------------
diff --git a/odbc/Common/Gzip.cpp b/odbc/Common/Gzip.cpp
index 995540a..bbfc00c 100644
--- a/odbc/Common/Gzip.cpp
+++ b/odbc/Common/Gzip.cpp
@@ -1,3 +1,23 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #include <cstdio>
 #include <string>
 #include <cstring>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/Gzip.h
----------------------------------------------------------------------
diff --git a/odbc/Common/Gzip.h b/odbc/Common/Gzip.h
index 2fb4b88..144ea7c 100644
--- a/odbc/Common/Gzip.h
+++ b/odbc/Common/Gzip.h
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #pragma once
 #include <string>
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/JDBCODBC.cpp
----------------------------------------------------------------------
diff --git a/odbc/Common/JDBCODBC.cpp b/odbc/Common/JDBCODBC.cpp
index 39e8278..8753fd2 100644
--- a/odbc/Common/JDBCODBC.cpp
+++ b/odbc/Common/JDBCODBC.cpp
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #include "JDBCODBC.h"
 
 ODBCTypes JDBC2ODBC ( int jtype ) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/JDBCODBC.h
----------------------------------------------------------------------
diff --git a/odbc/Common/JDBCODBC.h b/odbc/Common/JDBCODBC.h
index 71849f5..e5c9eef 100644
--- a/odbc/Common/JDBCODBC.h
+++ b/odbc/Common/JDBCODBC.h
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #pragma once
 
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/JsonConverter.cpp
----------------------------------------------------------------------
diff --git a/odbc/Common/JsonConverter.cpp b/odbc/Common/JsonConverter.cpp
index adc639a..ce3fdfa 100644
--- a/odbc/Common/JsonConverter.cpp
+++ b/odbc/Common/JsonConverter.cpp
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #include "JsonConverter.h"
 
 #define ASSIGN_IF_NOT_NULL(x,y,z)  if(!y.is_null())x=y.z

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/JsonConverter.h
----------------------------------------------------------------------
diff --git a/odbc/Common/JsonConverter.h b/odbc/Common/JsonConverter.h
index ac6dbcf..c34c4c6 100644
--- a/odbc/Common/JsonConverter.h
+++ b/odbc/Common/JsonConverter.h
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 
 #pragma once
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/MsgTypes.h
----------------------------------------------------------------------
diff --git a/odbc/Common/MsgTypes.h b/odbc/Common/MsgTypes.h
index ba5b427..d898bf1 100644
--- a/odbc/Common/MsgTypes.h
+++ b/odbc/Common/MsgTypes.h
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 // ----------------------------------------------------------------------------
 //
 // File:        MsgTypes.h

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/QueryCache.cpp
----------------------------------------------------------------------
diff --git a/odbc/Common/QueryCache.cpp b/odbc/Common/QueryCache.cpp
index 6c5f869..df8f6ac 100644
--- a/odbc/Common/QueryCache.cpp
+++ b/odbc/Common/QueryCache.cpp
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #include "QueryCache.h"
 #include <cpprest/http_client.h>
 #include <cpprest/filestream.h>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/QueryCache.h
----------------------------------------------------------------------
diff --git a/odbc/Common/QueryCache.h b/odbc/Common/QueryCache.h
index e3e55e3..8b01651 100644
--- a/odbc/Common/QueryCache.h
+++ b/odbc/Common/QueryCache.h
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #pragma once
 
 #include "MsgTypes.h"

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/REST.cpp
----------------------------------------------------------------------
diff --git a/odbc/Common/REST.cpp b/odbc/Common/REST.cpp
index b008f4d..15921aa 100644
--- a/odbc/Common/REST.cpp
+++ b/odbc/Common/REST.cpp
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #include <cpprest/http_client.h>
 #include <cpprest/filestream.h>
 #include <cpprest/json.h>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/REST.h
----------------------------------------------------------------------
diff --git a/odbc/Common/REST.h b/odbc/Common/REST.h
index 99cd90f..e20d745 100644
--- a/odbc/Common/REST.h
+++ b/odbc/Common/REST.h
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #pragma once
 
 #include "vld.h"

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/StringUtils.cpp
----------------------------------------------------------------------
diff --git a/odbc/Common/StringUtils.cpp b/odbc/Common/StringUtils.cpp
index 4dd336b..f31b75b 100644
--- a/odbc/Common/StringUtils.cpp
+++ b/odbc/Common/StringUtils.cpp
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #include <iostream>
 #include <stdlib.h>
 #include <string>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/StringUtils.h
----------------------------------------------------------------------
diff --git a/odbc/Common/StringUtils.h b/odbc/Common/StringUtils.h
index 154f06b..d42796e 100644
--- a/odbc/Common/StringUtils.h
+++ b/odbc/Common/StringUtils.h
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #pragma once
 
 #include <memory>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/base64.cpp
----------------------------------------------------------------------
diff --git a/odbc/Common/base64.cpp b/odbc/Common/base64.cpp
index e5f2427..7b09664 100644
--- a/odbc/Common/base64.cpp
+++ b/odbc/Common/base64.cpp
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #include "Base64.h"
 #include <iostream>
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Common/base64.h
----------------------------------------------------------------------
diff --git a/odbc/Common/base64.h b/odbc/Common/base64.h
index b10860f..f0af44d 100644
--- a/odbc/Common/base64.h
+++ b/odbc/Common/base64.h
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #pragma once
 
 #include <string>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_ALLOC.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_ALLOC.CPP b/odbc/Driver/KO_ALLOC.CPP
index 6176e5c..a74172d 100644
--- a/odbc/Driver/KO_ALLOC.CPP
+++ b/odbc/Driver/KO_ALLOC.CPP
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 
 // ----------------------------------------------------------------------------
 //

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_ATTR.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_ATTR.CPP b/odbc/Driver/KO_ATTR.CPP
index e9016b4..7cfae45 100644
--- a/odbc/Driver/KO_ATTR.CPP
+++ b/odbc/Driver/KO_ATTR.CPP
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 // ----------------------------------------------------------------------------
 //
 // File:    KO_ATTR.CPP

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_CONN.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_CONN.CPP b/odbc/Driver/KO_CONN.CPP
index 2a5233b..14a2a06 100644
--- a/odbc/Driver/KO_CONN.CPP
+++ b/odbc/Driver/KO_CONN.CPP
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 // ----------------------------------------------------------------------------
 //
 // File:    KO_CONN.CPP

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_CTLG.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_CTLG.CPP b/odbc/Driver/KO_CTLG.CPP
index 0170428..51f9e7d 100644
--- a/odbc/Driver/KO_CTLG.CPP
+++ b/odbc/Driver/KO_CTLG.CPP
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 // ----------------------------------------------------------------------------
 //
 // File:    KO_CTLG.CPP

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_Config.cpp
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_Config.cpp b/odbc/Driver/KO_Config.cpp
index 4fa2773..69b9d97 100644
--- a/odbc/Driver/KO_Config.cpp
+++ b/odbc/Driver/KO_Config.cpp
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 #include "stdafx.h"
 
 #include "StringUtils.h"

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_DESC.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_DESC.CPP b/odbc/Driver/KO_DESC.CPP
index 77d8ead..5e15745 100644
--- a/odbc/Driver/KO_DESC.CPP
+++ b/odbc/Driver/KO_DESC.CPP
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+ 
 
 // ----------------------------------------------------------------------------
 //
@@ -2467,8 +2486,8 @@ RETCODE SQL_API _SQLGetIRDItemField ( const pODBCIRD pDesc, const pIRDItem pDesc
             
         case SQL_DESC_FIXED_PREC_SCALE://9
         
-            //SQL_TRUE if the column has a fixed precision and nonzero scale that are data source�Cspecific.
-            //SQL_FALSE if the column does not have a fixed precision and nonzero scale that are data source�Cspecific.
+            //SQL_TRUE if the column has a fixed precision and nonzero scale that are data source�Cspecific.
+            //SQL_FALSE if the column does not have a fixed precision and nonzero scale that are data source�Cspecific.
             if ( isApproximateNumerical ( pDescItem->columnType ) ) {
                 * ( ( Long* ) pDataPtr ) = SQL_FALSE;
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_DIAG.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_DIAG.CPP b/odbc/Driver/KO_DIAG.CPP
index 238bb64..e7cf27e 100644
--- a/odbc/Driver/KO_DIAG.CPP
+++ b/odbc/Driver/KO_DIAG.CPP
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 
 // ----------------------------------------------------------------------------
 //

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_DTYPE.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_DTYPE.CPP b/odbc/Driver/KO_DTYPE.CPP
index 2c1c940..62d6ee9 100644
--- a/odbc/Driver/KO_DTYPE.CPP
+++ b/odbc/Driver/KO_DTYPE.CPP
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 
 // ----------------------------------------------------------------------------
 //

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_EXEC.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_EXEC.CPP b/odbc/Driver/KO_EXEC.CPP
index 6242c48..0f786ca 100644
--- a/odbc/Driver/KO_EXEC.CPP
+++ b/odbc/Driver/KO_EXEC.CPP
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 
 // ----------------------------------------------------------------------------
 //

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_FETCH.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_FETCH.CPP b/odbc/Driver/KO_FETCH.CPP
index 5859eb5..1a87e09 100644
--- a/odbc/Driver/KO_FETCH.CPP
+++ b/odbc/Driver/KO_FETCH.CPP
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 
 // ----------------------------------------------------------------------------
 //
@@ -259,7 +277,7 @@ RETCODE SQL_API SQLColAttribute ( SQLHSTMT        pStmt,
 
 // ----------------------------------------------------------------------
 // to get the basic set of ARD col attributes, ie details recd. from server --- FROM IRD
-// SQLDescribeCol returns the result descriptor �� column name,type, column size, decimal digits, and nullability (from msdn)
+// SQLDescribeCol returns the result descriptor �� column name,type, column size, decimal digits, and nullability (from msdn)
 // kylin specific
 // ----------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_INFO.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_INFO.CPP b/odbc/Driver/KO_INFO.CPP
index cc8356d..10a8d5b 100644
--- a/odbc/Driver/KO_INFO.CPP
+++ b/odbc/Driver/KO_INFO.CPP
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 
 // ----------------------------------------------------------------------------
 //
@@ -66,7 +84,7 @@ RETCODE SQL_API SQLGetInfoW ( SQLHDBC                    pConn,
             
         /*
             An SQLUINTEGER bitmask enumerating the SQL-92 datetime literals supported by the data source. Note that these are the datetime literals listed in the SQL-92 specification and are separate from the datetime literal escape clauses defined by ODBC. For more information about the ODBC datetime literal escape clauses, see Date, Time, and Timestamp Literals.
-            A FIPS Transitional level�Cconformant driver will always return the "1" value in the bitmask for the bits in the following list. A value of "0" means that SQL-92 datetime literals are not supported.
+            A FIPS Transitional level�Cconformant driver will always return the "1" value in the bitmask for the bits in the following list. A value of "0" means that SQL-92 datetime literals are not supported.
             The following bitmasks are used to determine which literals are supported:
             SQL_DL_SQL92_DATESQL_DL_SQL92_TIMESQL_DL_SQL92_TIMESTAMPSQL_DL_SQL92_INTERVAL_YEARSQL_DL_SQL92_INTERVAL_MONTHSQL_DL_SQL92_INTERVAL_DAYSQL_DL_SQL92_INTERVAL_HOURSQL_DL_SQL92_INTERVAL_MINUTESQL_DL_SQL92_INTERVAL_SECONDSQL_DL_SQL92_INTERVAL_YEAR_TO_MONTHSQL_DL_SQL92_INTERVAL_DAY_TO_HOUR
             SQL_DL_SQL92_INTERVAL_DAY_TO_MINUTESQL_DL_SQL92_INTERVAL_DAY_TO_SECONDSQL_DL_SQL92_INTERVAL_HOUR_TO_MINUTESQL_DL_SQL92_INTERVAL_HOUR_TO_SECONDSQL_DL_SQL92_INTERVAL_MINUTE_TO_SECOND
@@ -96,7 +114,7 @@ RETCODE SQL_API SQLGetInfoW ( SQLHDBC                    pConn,
             An SQLUINTEGER bitmask enumerating the timestamp intervals supported by the driver and associated data source for the TIMESTAMPADD scalar function.
             The following bitmasks are used to determine which intervals are supported:
             SQL_FN_TSI_FRAC_SECONDSQL_FN_TSI_SECONDSQL_FN_TSI_MINUTESQL_FN_TSI_HOURSQL_FN_TSI_DAYSQL_FN_TSI_WEEKSQL_FN_TSI_MONTHSQL_FN_TSI_QUARTERSQL_FN_TSI_YEAR
-            An FIPS Transitional level�Cconformant driver will always return a bitmask in which all of these bits are set.SQL_DATETIME_LITERALS(ODBC 3.0)
+            An FIPS Transitional level�Cconformant driver will always return a bitmask in which all of these bits are set.SQL_DATETIME_LITERALS(ODBC 3.0)
         */
         case SQL_TIMEDATE_ADD_INTERVALS : // 109 called
             * ( ( Long* ) pInfoValuePtr ) =
@@ -115,7 +133,7 @@ RETCODE SQL_API SQLGetInfoW ( SQLHDBC                    pConn,
             An SQLUINTEGER bitmask enumerating the timestamp intervals supported by the driver and associated data source for the TIMESTAMPDIFF scalar function.
             The following bitmasks are used to determine which intervals are supported:
             SQL_FN_TSI_FRAC_SECONDSQL_FN_TSI_SECONDSQL_FN_TSI_MINUTESQL_FN_TSI_HOURSQL_FN_TSI_DAYSQL_FN_TSI_WEEKSQL_FN_TSI_MONTHSQL_FN_TSI_QUARTERSQL_FN_TSI_YEAR
-            An FIPS Transitional level�Cconformant driver will always return a bitmask in which all of these bits are set.
+            An FIPS Transitional level�Cconformant driver will always return a bitmask in which all of these bits are set.
         */
         case SQL_TIMEDATE_DIFF_INTERVALS : //110 called
             * ( ( Long* ) pInfoValuePtr ) =
@@ -610,7 +628,7 @@ RETCODE SQL_API SQLGetInfo ( SQLHDBC         pConn,
             
         /*
             An SQLUINTEGER bitmask enumerating the SQL-92 datetime literals supported by the data source. Note that these are the datetime literals listed in the SQL-92 specification and are separate from the datetime literal escape clauses defined by ODBC. For more information about the ODBC datetime literal escape clauses, see Date, Time, and Timestamp Literals.
-            A FIPS Transitional level�Cconformant driver will always return the "1" value in the bitmask for the bits in the following list. A value of "0" means that SQL-92 datetime literals are not supported.
+            A FIPS Transitional level�Cconformant driver will always return the "1" value in the bitmask for the bits in the following list. A value of "0" means that SQL-92 datetime literals are not supported.
             The following bitmasks are used to determine which literals are supported:
             SQL_DL_SQL92_DATESQL_DL_SQL92_TIMESQL_DL_SQL92_TIMESTAMPSQL_DL_SQL92_INTERVAL_YEARSQL_DL_SQL92_INTERVAL_MONTHSQL_DL_SQL92_INTERVAL_DAYSQL_DL_SQL92_INTERVAL_HOURSQL_DL_SQL92_INTERVAL_MINUTESQL_DL_SQL92_INTERVAL_SECONDSQL_DL_SQL92_INTERVAL_YEAR_TO_MONTHSQL_DL_SQL92_INTERVAL_DAY_TO_HOUR
             SQL_DL_SQL92_INTERVAL_DAY_TO_MINUTESQL_DL_SQL92_INTERVAL_DAY_TO_SECONDSQL_DL_SQL92_INTERVAL_HOUR_TO_MINUTESQL_DL_SQL92_INTERVAL_HOUR_TO_SECONDSQL_DL_SQL92_INTERVAL_MINUTE_TO_SECOND
@@ -640,7 +658,7 @@ RETCODE SQL_API SQLGetInfo ( SQLHDBC         pConn,
             An SQLUINTEGER bitmask enumerating the timestamp intervals supported by the driver and associated data source for the TIMESTAMPADD scalar function.
             The following bitmasks are used to determine which intervals are supported:
             SQL_FN_TSI_FRAC_SECONDSQL_FN_TSI_SECONDSQL_FN_TSI_MINUTESQL_FN_TSI_HOURSQL_FN_TSI_DAYSQL_FN_TSI_WEEKSQL_FN_TSI_MONTHSQL_FN_TSI_QUARTERSQL_FN_TSI_YEAR
-            An FIPS Transitional level�Cconformant driver will always return a bitmask in which all of these bits are set.SQL_DATETIME_LITERALS(ODBC 3.0)
+            An FIPS Transitional level�Cconformant driver will always return a bitmask in which all of these bits are set.SQL_DATETIME_LITERALS(ODBC 3.0)
         */
         case SQL_TIMEDATE_ADD_INTERVALS : // 109 called
             * ( ( Long* ) pInfoValuePtr ) =
@@ -659,7 +677,7 @@ RETCODE SQL_API SQLGetInfo ( SQLHDBC         pConn,
             An SQLUINTEGER bitmask enumerating the timestamp intervals supported by the driver and associated data source for the TIMESTAMPDIFF scalar function.
             The following bitmasks are used to determine which intervals are supported:
             SQL_FN_TSI_FRAC_SECONDSQL_FN_TSI_SECONDSQL_FN_TSI_MINUTESQL_FN_TSI_HOURSQL_FN_TSI_DAYSQL_FN_TSI_WEEKSQL_FN_TSI_MONTHSQL_FN_TSI_QUARTERSQL_FN_TSI_YEAR
-            An FIPS Transitional level�Cconformant driver will always return a bitmask in which all of these bits are set.
+            An FIPS Transitional level�Cconformant driver will always return a bitmask in which all of these bits are set.
         */
         case SQL_TIMEDATE_DIFF_INTERVALS : //110 called
             * ( ( Long* ) pInfoValuePtr ) =

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_PARAM.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_PARAM.CPP b/odbc/Driver/KO_PARAM.CPP
index 8df4fe5..9bbe14c 100644
--- a/odbc/Driver/KO_PARAM.CPP
+++ b/odbc/Driver/KO_PARAM.CPP
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 
 // ----------------------------------------------------------------------------
 //

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KO_UTILS.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KO_UTILS.CPP b/odbc/Driver/KO_UTILS.CPP
index 9962a6a..a085dca 100644
--- a/odbc/Driver/KO_UTILS.CPP
+++ b/odbc/Driver/KO_UTILS.CPP
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 
 // ---------------------------------------------------------------------------------
 //

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KylinODBC.CPP
----------------------------------------------------------------------
diff --git a/odbc/Driver/KylinODBC.CPP b/odbc/Driver/KylinODBC.CPP
index bca2748..5eda301 100644
--- a/odbc/Driver/KylinODBC.CPP
+++ b/odbc/Driver/KylinODBC.CPP
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 
 // ---------------------------------------------------------------------------------
 //

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/KylinODBC.H
----------------------------------------------------------------------
diff --git a/odbc/Driver/KylinODBC.H b/odbc/Driver/KylinODBC.H
index bcce5e2..534a04e 100644
--- a/odbc/Driver/KylinODBC.H
+++ b/odbc/Driver/KylinODBC.H
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 // ----------------------------------------------------------------------------
 //
 // File:        KylinODBC.h

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/TypeConvertion.h
----------------------------------------------------------------------
diff --git a/odbc/Driver/TypeConvertion.h b/odbc/Driver/TypeConvertion.h
index 3b713f9..43fbad7 100644
--- a/odbc/Driver/TypeConvertion.h
+++ b/odbc/Driver/TypeConvertion.h
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 enum JDBCTypes
 {
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/resource.h
----------------------------------------------------------------------
diff --git a/odbc/Driver/resource.h b/odbc/Driver/resource.h
index 2669bab..eb0d30b 100644
--- a/odbc/Driver/resource.h
+++ b/odbc/Driver/resource.h
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 //{{NO_DEPENDENCIES}}
 // Microsoft Visual C++ generated include file.
 // Used by GODBC.RC

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/stdafx.cpp
----------------------------------------------------------------------
diff --git a/odbc/Driver/stdafx.cpp b/odbc/Driver/stdafx.cpp
index 1577c4e..9d73fe5 100644
--- a/odbc/Driver/stdafx.cpp
+++ b/odbc/Driver/stdafx.cpp
@@ -1 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #include "stdafx.h"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/Driver/stdafx.h
----------------------------------------------------------------------
diff --git a/odbc/Driver/stdafx.h b/odbc/Driver/stdafx.h
index a1c80d2..79c6597 100644
--- a/odbc/Driver/stdafx.h
+++ b/odbc/Driver/stdafx.h
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #pragma message("Compiling precompiled headers. \n")
 
 #include "KylinODBC.h"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/TestDLL/ColorPrint.cpp
----------------------------------------------------------------------
diff --git a/odbc/TestDLL/ColorPrint.cpp b/odbc/TestDLL/ColorPrint.cpp
index 77a1f0d..17036b2 100644
--- a/odbc/TestDLL/ColorPrint.cpp
+++ b/odbc/TestDLL/ColorPrint.cpp
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #include <stdio.h>
 #include <windows.h>
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/TestDLL/ColorPrint.h
----------------------------------------------------------------------
diff --git a/odbc/TestDLL/ColorPrint.h b/odbc/TestDLL/ColorPrint.h
index d907f22..472be69 100644
--- a/odbc/TestDLL/ColorPrint.h
+++ b/odbc/TestDLL/ColorPrint.h
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #pragma once
 
 void setPrintColorRED();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/TestDLL/CompareQueryTests.cpp
----------------------------------------------------------------------
diff --git a/odbc/TestDLL/CompareQueryTests.cpp b/odbc/TestDLL/CompareQueryTests.cpp
index 9574789..dbdab90 100644
--- a/odbc/TestDLL/CompareQueryTests.cpp
+++ b/odbc/TestDLL/CompareQueryTests.cpp
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #include "Tests.h"
 #include <vector>
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/TestDLL/QueryFlowTest.cpp
----------------------------------------------------------------------
diff --git a/odbc/TestDLL/QueryFlowTest.cpp b/odbc/TestDLL/QueryFlowTest.cpp
index 9961e48..6de578b 100644
--- a/odbc/TestDLL/QueryFlowTest.cpp
+++ b/odbc/TestDLL/QueryFlowTest.cpp
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #include "Tests.h"
 
 void queryFlowTest() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/TestDLL/Report.cpp
----------------------------------------------------------------------
diff --git a/odbc/TestDLL/Report.cpp b/odbc/TestDLL/Report.cpp
index 27a9530..a637653 100644
--- a/odbc/TestDLL/Report.cpp
+++ b/odbc/TestDLL/Report.cpp
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #include "Tests.h"
 
 void report ( const char* msg ) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/TestDLL/RestAPITest.cpp
----------------------------------------------------------------------
diff --git a/odbc/TestDLL/RestAPITest.cpp b/odbc/TestDLL/RestAPITest.cpp
index f16bfb3..31a004f 100644
--- a/odbc/TestDLL/RestAPITest.cpp
+++ b/odbc/TestDLL/RestAPITest.cpp
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #include "Tests.h"
 
 void restAPITest() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/TestDLL/SimpleQueryTest.cpp
----------------------------------------------------------------------
diff --git a/odbc/TestDLL/SimpleQueryTest.cpp b/odbc/TestDLL/SimpleQueryTest.cpp
index 50a5432..4ec2b23 100644
--- a/odbc/TestDLL/SimpleQueryTest.cpp
+++ b/odbc/TestDLL/SimpleQueryTest.cpp
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #include "Tests.h"
 
 void simpleQueryTest() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/TestDLL/Source.cpp
----------------------------------------------------------------------
diff --git a/odbc/TestDLL/Source.cpp b/odbc/TestDLL/Source.cpp
index b0b589b..389ebd5 100644
--- a/odbc/TestDLL/Source.cpp
+++ b/odbc/TestDLL/Source.cpp
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #include "vld.h"
 #include "Tests.h"
 using namespace std;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/TestDLL/Tests.h
----------------------------------------------------------------------
diff --git a/odbc/TestDLL/Tests.h b/odbc/TestDLL/Tests.h
index 06dbe9c..98fecee 100644
--- a/odbc/TestDLL/Tests.h
+++ b/odbc/TestDLL/Tests.h
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
 #pragma once
 
 #define prod_KServerAddr ""

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/doc/reference/0205agarwal.zip
----------------------------------------------------------------------
diff --git a/odbc/doc/reference/0205agarwal.zip b/odbc/doc/reference/0205agarwal.zip
deleted file mode 100644
index 4f8e642..0000000
Binary files a/odbc/doc/reference/0205agarwal.zip and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/odbc/doc/reference/how to customize a odbc driver based on REST service (chinese).docx
----------------------------------------------------------------------
diff --git a/odbc/doc/reference/how to customize a odbc driver based on REST service (chinese).docx b/odbc/doc/reference/how to customize a odbc driver based on REST service (chinese).docx
new file mode 100644
index 0000000..402313d
Binary files /dev/null and b/odbc/doc/reference/how to customize a odbc driver based on REST service (chinese).docx differ

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/"odbc/doc/\345\246\202\344\275\225\345\256\232\345\210\266\344\270\200\344\270\252\345\237\272\344\272\216REST Service\347\232\204ODBC\351\251\261\345\212\250\347\250\213\345\272\217.docx"
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d7c37e0a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9ad1ca0..faf3f2a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -820,7 +820,10 @@
                                 <exclude>**/*.dic</exclude>
                                 <exclude>**/*.snapshot</exclude>
                                 <exclude>**/*.pdf</exclude>
+                                <exclude>**/*.docx</exclude>
+                                <exclude>**/*.doc</exclude>
                                 <exclude>**/*.log</exclude>
+                                <exclude>**/*.txt</exclude>
 
                                 <exclude>**/.checkstyle</exclude>
                                 <!--Job's Test Data-->
@@ -858,6 +861,18 @@
                                 <!-- HBase MiniCluster Testing Data, for testing only -->
                                 <exclude>examples/test_case_data/minicluster/hbase-export.tar.gz</exclude>
                                 <exclude>examples/test_case_data/**/*.xml</exclude>
+                                
+                                <!--ODBC sub project is a VS project, exclude related files -->
+                                <exclude>**/*.sln</exclude>
+                                <exclude>**/*.vcxproj</exclude>
+                                <exclude>**/*.vcxproj.filters</exclude>
+                                <exclude>**/*.vcxproj.user</exclude>
+                                <exclude>**/*.props</exclude>
+                                <exclude>**/*.RC</exclude>
+                                <exclude>**/*.dsp</exclude>
+                                <exclude>**/*.DEF</exclude>
+                                <exclude>**/*.isl</exclude>
+                                <exclude>**/*.isproj</exclude>
 
                             </excludes>
                         </configuration>


[07/11] incubator-kylin git commit: KYLIN-1039 fix CI, add all joins in query81.sql

Posted by qh...@apache.org.
KYLIN-1039 fix CI, add all joins in query81.sql


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/cc15d63f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/cc15d63f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/cc15d63f

Branch: refs/heads/KYLIN-1011
Commit: cc15d63f0d58ae8cd6c6de383d40123239225a74
Parents: 10febfa
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Sep 22 16:29:59 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Sep 22 16:29:59 2015 +0800

----------------------------------------------------------------------
 query/src/test/resources/query/sql/query81.sql | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc15d63f/query/src/test/resources/query/sql/query81.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query81.sql b/query/src/test/resources/query/sql/query81.sql
index 7302a7d..93868e7 100644
--- a/query/src/test/resources/query/sql/query81.sql
+++ b/query/src/test/resources/query/sql/query81.sql
@@ -18,6 +18,11 @@
 
 select test_cal_dt.week_beg_dt, sum(price) as GMV
  from test_kylin_fact 
- inner JOIN edw.test_cal_dt as test_cal_dt  ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt 
+inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
+ inner JOIN edw.test_sites as test_sites
+ ON test_kylin_fact.lstg_site_id = test_sites.site_id
  where test_cal_dt.week_beg_dt between DATE '2013-09-01' and DATE '2013-10-01' and (lstg_format_name='FP-GTC' or 'a' = 'b')
  group by test_cal_dt.week_beg_dt
\ No newline at end of file


[04/11] incubator-kylin git commit: KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path

Posted by qh...@apache.org.
KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/738422e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/738422e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/738422e1

Branch: refs/heads/KYLIN-1011
Commit: 738422e1030097d887956e769a4bf754b06b750c
Parents: 8581df4
Author: sunyerui <su...@gmail.com>
Authored: Fri Sep 18 19:11:31 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 22 11:19:53 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/JoinedFlatTable.java   |   2 +-
 .../apache/kylin/source/hive/HiveMRInput.java   |  19 ++-
 .../storage/hbase/steps/HBaseMROutput.java      |   5 +-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  50 +++++++
 .../steps/HDFSPathGarbageCollectionStep.java    | 138 +++++++++++++++++++
 5 files changed, 209 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 6ae3ccb..5886325 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -72,7 +72,7 @@ public class JoinedFlatTable {
 
         ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
         ddl.append("STORED AS SEQUENCEFILE" + "\n");
-        ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
+        ddl.append("LOCATION '" + getTableDir(intermediateTableDesc, storageDfsDir) + "';").append("\n");
         // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
         // ";\n");
         return ddl.toString();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 1dcdc94..4571852 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -21,6 +21,8 @@ package org.apache.kylin.source.hive;
 import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
@@ -134,6 +136,7 @@ public class HiveMRInput implements IMRInput {
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
             step.setIntermediateTableIdentity(getIntermediateTableIdentity());
+            step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
             jobFlow.addTask(step);
         }
 
@@ -148,7 +151,6 @@ public class HiveMRInput implements IMRInput {
     }
 
     public static class GarbageCollectionStep extends AbstractExecutable {
-
         @Override
         protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
             KylinConfig config = context.getConfig();
@@ -161,6 +163,13 @@ public class HiveMRInput implements IMRInput {
                 try {
                     config.getCliCommandExecutor().execute(dropHiveCMD);
                     output.append("Hive table " + hiveTable + " is dropped. \n");
+
+                    Path externalDataPath = new Path(getExternalDataPath());
+                    FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+                    if (fs.exists(externalDataPath)) {
+                        fs.delete(externalDataPath, true);
+                        output.append("Hive table " + hiveTable + " external data path " + externalDataPath + " is deleted. \n");
+                    }
                 } catch (IOException e) {
                     logger.error("job:" + getId() + " execute finished with exception", e);
                     return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
@@ -177,6 +186,14 @@ public class HiveMRInput implements IMRInput {
         private String getIntermediateTableIdentity() {
             return getParam("oldHiveTable");
         }
+
+        public void setExternalDataPath(String externalDataPath) {
+            setParam("externalDataPath", externalDataPath);
+        }
+
+        private String getExternalDataPath() {
+            return getParam("externalDataPath");
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
index 8cbb7ff..c634a1d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
@@ -21,7 +21,6 @@ package org.apache.kylin.storage.hbase.steps;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMROutput;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.storage.hbase.steps.HBaseMRSteps;
 
 public class HBaseMROutput implements IMROutput {
 
@@ -37,7 +36,7 @@ public class HBaseMROutput implements IMROutput {
 
             @Override
             public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-                // nothing to do
+                steps.addCubingGarbageCollectionSteps(jobFlow);
             }
         };
     }
@@ -54,7 +53,7 @@ public class HBaseMROutput implements IMROutput {
 
             @Override
             public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createMergeGCStep());
+                steps.addMergingGarbageCollectionSteps(jobFlow);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index dfb4f33..f9e9b15 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.storage.hbase.steps;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
@@ -129,6 +130,16 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return mergingHTables;
     }
 
+    public List<String> getMergingHDFSPaths() {
+        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+        final List<String> mergingHDFSPaths = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
+        }
+        return mergingHDFSPaths;
+    }
+
     public String getHFilePath(String jobId) {
         return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/");
     }
@@ -137,4 +148,43 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
     }
 
+    public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+        String jobId = jobFlow.getId();
+
+        jobFlow.addTask(createMergeGCStep());
+
+        List<String> toDeletePathsOnHadoopCluster = new ArrayList<>();
+        toDeletePathsOnHadoopCluster.addAll(getMergingHDFSPaths());
+
+        List<String> toDeletePathsOnHbaseCluster = new ArrayList<>();
+        toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId));
+        toDeletePathsOnHbaseCluster.add(getHFilePath(jobId));
+
+        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster);
+        step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster);
+        step.setJobId(jobId);
+
+        jobFlow.addTask(step);
+    }
+
+    public void addCubingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+        String jobId = jobFlow.getId();
+
+        List<String> toDeletePathsOnHadoopCluster = new ArrayList<>();
+        toDeletePathsOnHadoopCluster.add(getFactDistinctColumnsPath(jobId));
+
+        List<String> toDeletePathsOnHbaseCluster = new ArrayList<>();
+        toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId));
+        toDeletePathsOnHbaseCluster.add(getHFilePath(jobId));
+
+        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster);
+        step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster);
+        step.setJobId(jobId);
+
+        jobFlow.addTask(step);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
new file mode 100644
index 0000000..2ae8ca8
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hbase.steps;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Created by sunyerui on 15/9/17.
+ */
+public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
+
+    private StringBuffer output;
+    private JobEngineConfig config;
+
+    public HDFSPathGarbageCollectionStep() {
+        super();
+        output = new StringBuffer();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        try {
+            config = new JobEngineConfig(context.getConfig());
+            dropHdfsPathOnCluster(getDeletePathsOnHadoopCluster(), FileSystem.get(HadoopUtil.getCurrentConfiguration()));
+            dropHdfsPathOnCluster(getDeletePathsOnHBaseCluster(), FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()));
+        } catch (IOException e) {
+            logger.error("job:" + getId() + " execute finished with exception", e);
+            output.append("\n").append(e.getLocalizedMessage());
+            return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+        }
+
+        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+    }
+
+    private void dropHdfsPathOnCluster(List<String> oldHdfsPaths, FileSystem fileSystem) throws IOException {
+        if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
+            logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri());
+            output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n");
+            for (String path : oldHdfsPaths) {
+                if (path.endsWith("*"))
+                    path = path.substring(0, path.length() - 1);
+
+                Path oldPath = new Path(path);
+                if (fileSystem.exists(oldPath)) {
+                    fileSystem.delete(oldPath, true);
+                    logger.debug("HDFS path " + path + " is dropped.");
+                    output.append("HDFS path " + path + " is dropped.\n");
+                } else {
+                    logger.debug("HDFS path " + path + " not exists.");
+                    output.append("HDFS path " + path + " not exists.\n");
+                }
+                // If hbase was deployed on another cluster, the job dir is empty and should be dropped,
+                // because of rowkey_stats and hfile dirs are both dropped.
+                if (fileSystem.listStatus(oldPath.getParent()).length == 0) {
+                    Path emptyJobPath = new Path(JobBuilderSupport.getJobWorkingDir(config, getJobId()));
+                    if (fileSystem.exists(emptyJobPath)) {
+                        fileSystem.delete(emptyJobPath, true);
+                        logger.debug("HDFS path " + emptyJobPath + " is empty and dropped.");
+                        output.append("HDFS path " + emptyJobPath + " is empty and dropped.\n");
+                    }
+                }
+            }
+        }
+    }
+
+    public void setDeletePathsOnHadoopCluster(List<String> deletePaths) {
+        setArrayParam("toDeletePathsOnHadoopCluster", deletePaths);
+    }
+
+    public void setDeletePathsOnHBaseCluster(List<String> deletePaths) {
+        setArrayParam("toDeletePathsOnHBaseCluster", deletePaths);
+    }
+
+    public void setJobId(String jobId) {
+        setParam("jobId", jobId);
+    }
+
+    public List<String> getDeletePathsOnHadoopCluster() {
+        return getArrayParam("toDeletePathsOnHadoopCluster");
+    }
+
+    public List<String> getDeletePathsOnHBaseCluster() {
+        return getArrayParam("toDeletePathsOnHBaseCluster");
+    }
+
+    public String getJobId() {
+        return getParam("jobId");
+    }
+
+    private void setArrayParam(String paramKey, List<String> paramValues) {
+        setParam(paramKey, StringUtils.join(paramValues, ","));
+    }
+
+    private List<String> getArrayParam(String paramKey) {
+        final String ids = getParam(paramKey);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id : splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+}


[09/11] incubator-kylin git commit: KYLIN-1011

Posted by qh...@apache.org.
KYLIN-1011


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/55a85df0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/55a85df0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/55a85df0

Branch: refs/heads/KYLIN-1011
Commit: 55a85df0cfbabbe0a176d05ae8830ea4c6ec9fa6
Parents: cc15d63
Author: qianhao.zhou <qi...@ebay.com>
Authored: Wed Sep 9 19:13:01 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Sep 22 17:29:32 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/KafkaDataLoader.java    |  54 +++++++
 build/bin/kylin.sh                              |   4 +-
 build/script/compress.sh                        |   1 +
 .../kylin/engine/streaming/StreamingCLI.java    |  99 ------------
 .../kylin/engine/streaming/cli/MonitorCLI.java  |  70 +++++++++
 .../engine/streaming/cli/StreamingCLI.java      | 120 +++++++++++++++
 .../streaming/monitor/StreamingMonitor.java     | 154 +++++++++++++++++++
 .../engine/streaming/util/StreamingUtils.java   |   2 +-
 .../kafka/ByteBufferBackedInputStream.java      |  53 +++++++
 .../kylin/source/kafka/KafkaStreamingInput.java |   6 +-
 .../source/kafka/TimedJsonStreamParser.java     | 142 +++++++++++++++++
 .../kylin/source/kafka/config/KafkaConfig.java  |  12 ++
 .../apache/kylin/job/monitor/MonitorCLI.java    |  69 ---------
 .../kylin/job/monitor/StreamingMonitor.java     | 154 -------------------
 .../kylin/job/streaming/KafkaDataLoader.java    |  54 -------
 .../kylin/job/streaming/StreamingBootstrap.java |   2 +-
 16 files changed, 614 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
new file mode 100644
index 0000000..95fbc9d
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.job.streaming;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.annotation.Nullable;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.streaming.BrokerConfig;
+import org.apache.kylin.streaming.KafkaClusterConfig;
+import org.apache.kylin.streaming.StreamingConfig;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+/**
+ * Load prepared data into kafka(for test use)
+ */
+public class KafkaDataLoader {
+
+    public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
+
+        KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
+        String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
+            @Nullable
+            @Override
+            public String apply(BrokerConfig brokerConfig) {
+                return brokerConfig.getHost() + ":" + brokerConfig.getPort();
+            }
+        }), ",");
+        Properties props = new Properties();
+        props.put("metadata.broker.list", brokerList);
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("request.required.acks", "1");
+
+        ProducerConfig config = new ProducerConfig(props);
+
+        Producer<String, String> producer = new Producer<String, String>(config);
+
+        List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
+        for (int i = 0; i < messages.size(); ++i) {
+            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
+            keyedMessages.add(keyedMessage);
+        }
+        producer.send(keyedMessages);
+        producer.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index b27864c..b581e09 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -115,7 +115,7 @@ then
         -Dkylin.hive.dependency=${hive_dependency} \
         -Dkylin.hbase.dependency=${hbase_dependency} \
         -Dspring.profiles.active=${spring_profile} \
-        org.apache.kylin.job.streaming.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
+        org.apache.kylin.engine.streaming.cli.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
         echo "streaming started name: $3 id: $4"
         exit 0
     elif [ $2 == "stop" ]
@@ -170,7 +170,7 @@ then
     -Dkylin.hive.dependency=${hive_dependency} \
     -Dkylin.hbase.dependency=${hbase_dependency} \
     -Dspring.profiles.active=${spring_profile} \
-    org.apache.kylin.job.monitor.MonitorCLI $@ >> ${KYLIN_HOME}/logs/monitor.log 2>&1
+    org.apache.kylin.engine.streaming.cli.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
     exit 0
 else
     echo "usage: kylin.sh start or kylin.sh stop"

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/build/script/compress.sh
----------------------------------------------------------------------
diff --git a/build/script/compress.sh b/build/script/compress.sh
index a424b98..c70e567 100755
--- a/build/script/compress.sh
+++ b/build/script/compress.sh
@@ -21,6 +21,7 @@ rm -rf lib tomcat commit_SHA1
 find kylin-${version} -type d -exec chmod 755 {} \;
 find kylin-${version} -type f -exec chmod 644 {} \;
 find kylin-${version} -type f -name "*.sh" -exec chmod 755 {} \;
+mkdir -p ../dist
 tar -cvzf ../dist/kylin-${version}.tar.gz kylin-${version}
 rm -rf kylin-${version}
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
deleted file mode 100644
index 8bf52c1..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.engine.streaming;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public class StreamingCLI {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
-
-    public static void main(String[] args) {
-        try {
-            AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
-
-            Preconditions.checkArgument(args[0].equals("streaming"));
-            Preconditions.checkArgument(args[1].equals("start"));
-
-            int i = 2;
-            BootstrapConfig bootstrapConfig = new BootstrapConfig();
-            while (i < args.length) {
-                String argName = args[i];
-                switch (argName) {
-                case "-oneoff":
-                    bootstrapConfig.setOneOff(Boolean.parseBoolean(args[++i]));
-                    break;
-                case "-start":
-                    bootstrapConfig.setStart(Long.parseLong(args[++i]));
-                    break;
-                case "-end":
-                    bootstrapConfig.setEnd(Long.parseLong(args[++i]));
-                    break;
-                case "-streaming":
-                    bootstrapConfig.setStreaming(args[++i]);
-                    break;
-                case "-partition":
-                    bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
-                    break;
-                case "-fillGap":
-                    bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
-                    break;
-                default:
-                    logger.warn("ignore this arg:" + argName);
-                }
-                i++;
-            }
-            final Runnable runnable = new OneOffStreamingBuilder(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd()).build();
-            runnable.run();
-            logger.info("streaming process stop, exit with 0");
-            System.exit(0);
-        } catch (Exception e) {
-            printArgsError(args);
-            logger.error("error start streaming", e);
-            System.exit(-1);
-        }
-    }
-
-    private static void printArgsError(String[] args) {
-        logger.warn("invalid args:" + StringUtils.join(args, " "));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
new file mode 100644
index 0000000..d7dc6b3
--- /dev/null
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.engine.streaming.cli;
+
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class MonitorCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
+
+    public static void main(String[] args) {
+        Preconditions.checkArgument(args[0].equals("monitor"));
+
+        int i = 1;
+        List<String> receivers = null;
+        String host = null;
+        String tableName = null;
+        String authorization = null;
+        String cubeName = null;
+        String projectName = "default";
+        while (i < args.length) {
+            String argName = args[i];
+            switch (argName) {
+            case "-receivers":
+                receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
+                break;
+            case "-host":
+                host = args[++i];
+                break;
+            case "-tableName":
+                tableName = args[++i];
+                break;
+            case "-authorization":
+                authorization = args[++i];
+                break;
+            case "-cubeName":
+                cubeName = args[++i];
+                break;
+            case "-projectName":
+                projectName = args[++i];
+                break;
+            default:
+                throw new RuntimeException("invalid argName:" + argName);
+            }
+            i++;
+        }
+        Preconditions.checkArgument(receivers != null && receivers.size() > 0);
+        final StreamingMonitor streamingMonitor = new StreamingMonitor();
+        if (tableName != null) {
+            logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
+            Preconditions.checkNotNull(host);
+            Preconditions.checkNotNull(authorization);
+            Preconditions.checkNotNull(tableName);
+            streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
+        }
+        if (cubeName != null) {
+            logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
+            streamingMonitor.checkCube(receivers, cubeName,host);
+        }
+        System.exit(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
new file mode 100644
index 0000000..a4ccabc
--- /dev/null
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -0,0 +1,120 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.engine.streaming.cli;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.cache.RemoteCacheUpdater;
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.engine.streaming.BootstrapConfig;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class StreamingCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
+
+    public static void main(String[] args) {
+        try {
+            AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
+
+            Preconditions.checkArgument(args[0].equals("streaming"));
+            Preconditions.checkArgument(args[1].equals("start"));
+
+            int i = 2;
+            BootstrapConfig bootstrapConfig = new BootstrapConfig();
+            while (i < args.length) {
+                String argName = args[i];
+                switch (argName) {
+                case "-oneoff":
+                    Boolean.parseBoolean(args[++i]);
+                    break;
+                case "-start":
+                    bootstrapConfig.setStart(Long.parseLong(args[++i]));
+                    break;
+                case "-end":
+                    bootstrapConfig.setEnd(Long.parseLong(args[++i]));
+                    break;
+                case "-streaming":
+                    bootstrapConfig.setStreaming(args[++i]);
+                    break;
+                case "-partition":
+                    bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
+                    break;
+                case "-fillGap":
+                    bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
+                    break;
+                default:
+                    logger.warn("ignore this arg:" + argName);
+                }
+                i++;
+            }
+            if (bootstrapConfig.isFillGap()) {
+                final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(bootstrapConfig.getStreaming());
+                final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+                logger.info("all gaps:" + StringUtils.join(gaps, ","));
+                for (Pair<Long, Long> gap : gaps) {
+                    startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond());
+                }
+            } else {
+                startOneOffCubeStreaming(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
+                logger.info("streaming process finished, exit with 0");
+                System.exit(0);
+            }
+        } catch (Exception e) {
+            printArgsError(args);
+            logger.error("error start streaming", e);
+            System.exit(-1);
+        }
+    }
+    
+    private static void startOneOffCubeStreaming(String streaming, long start, long end) {
+        final Runnable runnable = new OneOffStreamingBuilder(streaming, start, end).build();
+        runnable.run();
+    }
+
+    private static void printArgsError(String[] args) {
+        logger.warn("invalid args:" + StringUtils.join(args, " "));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
new file mode 100644
index 0000000..a6b8a9f
--- /dev/null
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
@@ -0,0 +1,154 @@
+package org.apache.kylin.engine.streaming.monitor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.MailService;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class StreamingMonitor {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
+
+    public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
+        String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
+        StringBuilder stringBuilder = new StringBuilder();
+        String url = host + "/kylin/api/query";
+        PostMethod request = new PostMethod(url);
+        try {
+
+            request.addRequestHeader("Authorization", "Basic " + authorization);
+            request.addRequestHeader("Content-Type", "application/json");
+            String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
+            request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
+
+            int statusCode = new HttpClient().executeMethod(request);
+            String msg = Bytes.toString(request.getResponseBody());
+            stringBuilder.append("host:").append(host).append("\n");
+            stringBuilder.append("query:").append(query).append("\n");
+            stringBuilder.append("statusCode:").append(statusCode).append("\n");
+            if (statusCode == 200) {
+                title += "succeed";
+                final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
+                stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
+                stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
+            } else {
+                title += "failed";
+                stringBuilder.append("response:").append(msg).append("\n");
+            }
+        } catch (Exception e) {
+            final StringWriter out = new StringWriter();
+            e.printStackTrace(new PrintWriter(out));
+            title += "failed";
+            stringBuilder.append(out.toString());
+        } finally {
+            request.releaseConnection();
+        }
+        logger.info("title:" + title);
+        logger.info("content:" + stringBuilder.toString());
+        sendMail(receivers, title, stringBuilder.toString());
+    }
+
+    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
+        List<CubeSegment> segments = getSortedReadySegments(cubeName);
+        List<Pair<Long, Long>> gaps = Lists.newArrayList();
+        for (int i = 0; i < segments.size() - 1; ++i) {
+            CubeSegment first = segments.get(i);
+            CubeSegment second = segments.get(i + 1);
+            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+                continue;
+            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
+                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
+            }
+        }
+        return gaps;
+    }
+
+    private static List<CubeSegment> getSortedReadySegments(String cubeName) {
+        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        Preconditions.checkNotNull(cube);
+        final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
+        logger.info("totally " + segments.size() + " cubeSegments");
+        Collections.sort(segments);
+        return segments;
+    }
+
+    public static final List<Pair<String, String>> findOverlaps(String cubeName) {
+        List<CubeSegment> segments = getSortedReadySegments(cubeName);
+        List<Pair<String, String>> overlaps = Lists.newArrayList();
+        for (int i = 0; i < segments.size() - 1; ++i) {
+            CubeSegment first = segments.get(i);
+            CubeSegment second = segments.get(i + 1);
+            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+                continue;
+            } else {
+                overlaps.add(Pair.newPair(first.getName(), second.getName()));
+            }
+        }
+        return overlaps;
+    }
+
+    public void checkCube(List<String> receivers, String cubeName, String host) {
+        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        if (cube == null) {
+            logger.info("cube:" + cubeName + " does not exist");
+            return;
+        }
+        List<Pair<Long, Long>> gaps = findGaps(cubeName);
+        List<Pair<String, String>> overlaps = Lists.newArrayList();
+        StringBuilder content = new StringBuilder();
+        if (!gaps.isEmpty()) {
+            content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
+                @Nullable
+                @Override
+                public String apply(Pair<Long, Long> input) {
+                    return parseInterval(input);
+                }
+            }), "\n")).append("\n");
+        }
+        if (!overlaps.isEmpty()) {
+            content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
+        }
+        if (content.length() > 0) {
+            logger.info(content.toString());
+            sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
+        } else {
+            logger.info("no gaps or overlaps");
+        }
+    }
+
+    private String parseInterval(Pair<Long, Long> interval) {
+        return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
+    }
+
+    private void sendMail(List<String> receivers, String title, String content) {
+        final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
+        mailService.sendMail(receivers, title, content, false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
index 718fc43..47db924 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
@@ -53,7 +53,7 @@ public class StreamingUtils {
     }
 
     public static IStreamingOutput getStreamingOutput(String streaming) {
-        return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.HBaseStreamingOutput");
+        return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput");
     }
 
     public static StreamingBatchBuilder getMicroBatchBuilder(String streaming) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
new file mode 100644
index 0000000..5883493
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ */
+class ByteBufferBackedInputStream extends InputStream {
+
+    private ByteBuffer buf;
+
+    public ByteBufferBackedInputStream(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+        return buf.get() & 0xFF;
+    }
+
+    @Override
+    public int read(byte[] bytes, int off, int len)
+            throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buf.remaining());
+        buf.get(bytes, off, len);
+        return len;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 393b8e7..09dee50 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -79,6 +79,7 @@ public class KafkaStreamingInput implements IStreamingInput {
     @Override
     public StreamingBatch getBatchWithTimeWindow(String streaming, int id, long startTime, long endTime) {
         try {
+            logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime));
             final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
             final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
             final KafkaConfig kafkaConfig = kafkaConfigManager.getStreamingConfig(streaming);
@@ -106,6 +107,7 @@ public class KafkaStreamingInput implements IStreamingInput {
                 }
             }
             final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
+            logger.info("finish to get streaming batch, total message count:" + messages.size());
             return new StreamingBatch(messages, timeRange);
         } catch (ReflectiveOperationException e) {
             throw new RuntimeException("failed to create instance of StreamingParser", e);
@@ -220,8 +222,8 @@ public class KafkaStreamingInput implements IStreamingInput {
         });
         if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
             Class clazz = Class.forName(kafkaConfig.getParserName());
-            Constructor constructor = clazz.getConstructor(List.class);
-            return (StreamingParser) constructor.newInstance(columns);
+            Constructor constructor = clazz.getConstructor(List.class, String.class);
+            return (StreamingParser) constructor.newInstance(columns, kafkaConfig.getParserProperties());
         } else {
             throw new IllegalStateException("invalid StreamingConfig:" + kafkaConfig.getName() + " missing property StreamingParser");
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
new file mode 100644
index 0000000..9b5071b
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -0,0 +1,142 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.source.kafka;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.collect.Lists;
+import kafka.message.MessageAndOffset;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * each json message with a "timestamp" field
+ */
+public final class TimedJsonStreamParser implements StreamingParser {
+
+    private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
+
+    private List<TblColRef> allColumns;
+    private boolean formatTs = false;
+    private final ObjectMapper mapper = new ObjectMapper();
+    private String tsColName = "timestamp";
+    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+    public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) {
+        this.allColumns = allColumns;
+        if (!StringUtils.isEmpty(propertiesStr)) {
+            String[] properties = propertiesStr.split(";");
+            for (String prop : properties) {
+                try {
+                    String[] parts = prop.split("=");
+                    if (parts.length == 2) {
+                        switch (parts[0]) {
+                        case "formatTs":
+                            this.formatTs = Boolean.valueOf(parts[1]);
+                            break;
+                        case "tsColName":
+                            this.tsColName = parts[1];
+                            break;
+                        default:
+                            break;
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to parse property " + prop);
+                    //ignore
+                }
+            }
+        }
+
+        logger.info("TimedJsonStreamParser with formatTs {} tsColName {}", formatTs, tsColName);
+    }
+
+    @Override
+    public StreamingMessage parse(MessageAndOffset messageAndOffset) {
+        try {
+            Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            String tsStr = root.get(tsColName);
+            //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
+            //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));
+            long t;
+            if (StringUtils.isEmpty(tsStr)) {
+                t = 0;
+            } else {
+                t = Long.valueOf(tsStr);
+            }
+            ArrayList<String> result = Lists.newArrayList();
+
+            for (TblColRef column : allColumns) {
+                String columnName = column.getName();
+                if (columnName.equalsIgnoreCase("minute_start")) {
+                    long minuteStart = TimeUtil.getMinuteStart(t);
+                    result.add(formatTs ? DateFormat.formatToTimeStr(minuteStart) : String.valueOf(minuteStart));
+                } else if (columnName.equalsIgnoreCase("hour_start")) {
+                    long hourStart = TimeUtil.getHourStart(t);
+                    result.add(formatTs ? DateFormat.formatToTimeStr(hourStart) : String.valueOf(hourStart));
+                } else if (columnName.equalsIgnoreCase("day_start")) {
+                    //of day start we'll add yyyy-mm-dd
+                    long ts = TimeUtil.getDayStart(t);
+                    result.add(DateFormat.formatToDateStr(ts));
+                } else {
+                    String x = root.get(columnName.toLowerCase());
+                    result.add(x);
+                }
+            }
+
+            return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap());
+
+        } catch (IOException e) {
+            logger.error("error", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean filter(StreamingMessage streamingMessage) {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index b56231a..1aff0ce 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -81,6 +81,10 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("margin")
     private long margin;
 
+    //"configA=1;configB=2"
+    @JsonProperty("parserProperties")
+    private String parserProperties;
+
     public List<KafkaClusterConfig> getKafkaClusterConfigs() {
         return kafkaClusterConfigs;
     }
@@ -141,6 +145,14 @@ public class KafkaConfig extends RootPersistentEntity {
         this.margin = margin;
     }
 
+    public String getParserProperties() {
+        return parserProperties;
+    }
+
+    public void setParserProperties(String parserProperties) {
+        this.parserProperties = parserProperties;
+    }
+
     @Override
     public KafkaConfig clone() {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java b/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
deleted file mode 100644
index 7b9831a..0000000
--- a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class MonitorCLI {
-
-    private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
-
-    public static void main(String[] args) {
-        Preconditions.checkArgument(args[0].equals("monitor"));
-
-        int i = 1;
-        List<String> receivers = null;
-        String host = null;
-        String tableName = null;
-        String authorization = null;
-        String cubeName = null;
-        String projectName = "default";
-        while (i < args.length) {
-            String argName = args[i];
-            switch (argName) {
-            case "-receivers":
-                receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
-                break;
-            case "-host":
-                host = args[++i];
-                break;
-            case "-tableName":
-                tableName = args[++i];
-                break;
-            case "-authorization":
-                authorization = args[++i];
-                break;
-            case "-cubeName":
-                cubeName = args[++i];
-                break;
-            case "-projectName":
-                projectName = args[++i];
-                break;
-            default:
-                throw new RuntimeException("invalid argName:" + argName);
-            }
-            i++;
-        }
-        Preconditions.checkArgument(receivers != null && receivers.size() > 0);
-        final StreamingMonitor streamingMonitor = new StreamingMonitor();
-        if (tableName != null) {
-            logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
-            Preconditions.checkNotNull(host);
-            Preconditions.checkNotNull(authorization);
-            Preconditions.checkNotNull(tableName);
-            streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
-        }
-        if (cubeName != null) {
-            logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
-            streamingMonitor.checkCube(receivers, cubeName,host);
-        }
-        System.exit(0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java b/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
deleted file mode 100644
index e23f065..0000000
--- a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingMonitor {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
-
-    public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
-        String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
-        StringBuilder stringBuilder = new StringBuilder();
-        String url = host + "/kylin/api/query";
-        PostMethod request = new PostMethod(url);
-        try {
-
-            request.addRequestHeader("Authorization", "Basic " + authorization);
-            request.addRequestHeader("Content-Type", "application/json");
-            String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
-            request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
-
-            int statusCode = new HttpClient().executeMethod(request);
-            String msg = Bytes.toString(request.getResponseBody());
-            stringBuilder.append("host:").append(host).append("\n");
-            stringBuilder.append("query:").append(query).append("\n");
-            stringBuilder.append("statusCode:").append(statusCode).append("\n");
-            if (statusCode == 200) {
-                title += "succeed";
-                final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
-                stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
-                stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
-            } else {
-                title += "failed";
-                stringBuilder.append("response:").append(msg).append("\n");
-            }
-        } catch (Exception e) {
-            final StringWriter out = new StringWriter();
-            e.printStackTrace(new PrintWriter(out));
-            title += "failed";
-            stringBuilder.append(out.toString());
-        } finally {
-            request.releaseConnection();
-        }
-        logger.info("title:" + title);
-        logger.info("content:" + stringBuilder.toString());
-        sendMail(receivers, title, stringBuilder.toString());
-    }
-
-    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<Long, Long>> gaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
-                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
-            }
-        }
-        return gaps;
-    }
-
-    private static List<CubeSegment> getSortedReadySegments(String cubeName) {
-        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        Preconditions.checkNotNull(cube);
-        final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
-        logger.info("totally " + segments.size() + " cubeSegments");
-        Collections.sort(segments);
-        return segments;
-    }
-
-    public static final List<Pair<String, String>> findOverlaps(String cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else {
-                overlaps.add(Pair.newPair(first.getName(), second.getName()));
-            }
-        }
-        return overlaps;
-    }
-
-    public void checkCube(List<String> receivers, String cubeName, String host) {
-        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        if (cube == null) {
-            logger.info("cube:" + cubeName + " does not exist");
-            return;
-        }
-        List<Pair<Long, Long>> gaps = findGaps(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        StringBuilder content = new StringBuilder();
-        if (!gaps.isEmpty()) {
-            content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
-                @Nullable
-                @Override
-                public String apply(Pair<Long, Long> input) {
-                    return parseInterval(input);
-                }
-            }), "\n")).append("\n");
-        }
-        if (!overlaps.isEmpty()) {
-            content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
-        }
-        if (content.length() > 0) {
-            logger.info(content.toString());
-            sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
-        } else {
-            logger.info("no gaps or overlaps");
-        }
-    }
-
-    private String parseInterval(Pair<Long, Long> interval) {
-        return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
-    }
-
-    private void sendMail(List<String> receivers, String title, String content) {
-        final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
-        mailService.sendMail(receivers, title, content, false);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
deleted file mode 100644
index 95fbc9d..0000000
--- a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.util.List;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
-
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.streaming.BrokerConfig;
-import org.apache.kylin.streaming.KafkaClusterConfig;
-import org.apache.kylin.streaming.StreamingConfig;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-
-/**
- * Load prepared data into kafka(for test use)
- */
-public class KafkaDataLoader {
-
-    public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
-
-        KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
-        String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
-            @Nullable
-            @Override
-            public String apply(BrokerConfig brokerConfig) {
-                return brokerConfig.getHost() + ":" + brokerConfig.getPort();
-            }
-        }), ",");
-        Properties props = new Properties();
-        props.put("metadata.broker.list", brokerList);
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("request.required.acks", "1");
-
-        ProducerConfig config = new ProducerConfig(props);
-
-        Producer<String, String> producer = new Producer<String, String>(config);
-
-        List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
-        for (int i = 0; i < messages.size(); ++i) {
-            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
-            keyedMessages.add(keyedMessage);
-        }
-        producer.send(keyedMessages);
-        producer.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 4212fea..551006f 100644
--- a/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -63,7 +63,7 @@ import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.job.monitor.StreamingMonitor;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
 import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.hbase.HBaseConnection;


[06/11] incubator-kylin git commit: KYLIN-1039 Add test case to verify if the problem exists

Posted by qh...@apache.org.
KYLIN-1039 Add test case to verify if the problem exists


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/10febfa0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/10febfa0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/10febfa0

Branch: refs/heads/KYLIN-1011
Commit: 10febfa0d2c555d958122178466e35823106552f
Parents: 985e1fb
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Sep 22 14:59:25 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Sep 22 15:00:02 2015 +0800

----------------------------------------------------------------------
 query/src/test/resources/query/sql/query81.sql  | 23 ++++++++++++++++++++
 .../coprocessor/observer/ObserverEnabler.java   |  8 +++++--
 2 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/10febfa0/query/src/test/resources/query/sql/query81.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query81.sql b/query/src/test/resources/query/sql/query81.sql
new file mode 100644
index 0000000..7302a7d
--- /dev/null
+++ b/query/src/test/resources/query/sql/query81.sql
@@ -0,0 +1,23 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select test_cal_dt.week_beg_dt, sum(price) as GMV
+ from test_kylin_fact 
+ inner JOIN edw.test_cal_dt as test_cal_dt  ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt 
+ where test_cal_dt.week_beg_dt between DATE '2013-09-01' and DATE '2013-10-01' and (lstg_format_name='FP-GTC' or 'a' = 'b')
+ group by test_cal_dt.week_beg_dt
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/10febfa0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
index e22cc00..7db2416 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
@@ -106,12 +106,16 @@ public class ObserverEnabler {
 
         String forceFlag = System.getProperty(FORCE_COPROCESSOR);
         if (forceFlag != null) {
-            return Boolean.parseBoolean(forceFlag);
+            boolean r = Boolean.parseBoolean(forceFlag);
+            logger.info("Coprocessor is " + (r ? "enabled" : "disabled") + " according to sys prop " + FORCE_COPROCESSOR);
+            return r;
         }
 
         Boolean cubeOverride = CUBE_OVERRIDES.get(cube.getName());
         if (cubeOverride != null) {
-            return cubeOverride.booleanValue();
+            boolean r = cubeOverride.booleanValue();
+            logger.info("Coprocessor is " + (r ? "enabled" : "disabled") + " according to cube overrides");
+            return r;
         }
 
         //        if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) {


[11/11] incubator-kylin git commit: fix

Posted by qh...@apache.org.
fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/919cb995
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/919cb995
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/919cb995

Branch: refs/heads/KYLIN-1011
Commit: 919cb9951562a0379d60b075d1670be2e31a7cd6
Parents: b5621dc
Author: qianhao.zhou <qi...@ebay.com>
Authored: Wed Sep 23 11:28:35 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Sep 23 11:28:35 2015 +0800

----------------------------------------------------------------------
 .../src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/919cb995/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 2e84a84..e77598f 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -200,7 +200,7 @@ public class BuildIIWithStreamTest {
         int count = sorted.size();
         ArrayList<StreamingMessage> messages = Lists.newArrayList();
         for (String[] row : sorted) {
-            if (messages.size() >= iiDesc.getSliceSize()) {
+            if (messages.size() < iiDesc.getSliceSize()) {
                 messages.add(parse(row));
             } else {
                 build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);


[03/11] incubator-kylin git commit: KYLIN-957 Support HBase in a separate cluster

Posted by qh...@apache.org.
KYLIN-957 Support HBase in a separate cluster

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8581df4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8581df4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8581df4a

Branch: refs/heads/KYLIN-1011
Commit: 8581df4a252c313e303d5bd6f93272a5e189ee4e
Parents: d7c37e0
Author: sunyerui <su...@gmail.com>
Authored: Wed Sep 16 00:03:29 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 22 11:17:10 2015 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                     |  4 +++
 .../org/apache/kylin/common/KylinConfig.java    |  6 ++++
 engine-mr/pom.xml                               |  5 +++
 .../org/apache/kylin/engine/mr/HadoopUtil.java  | 32 +++++++++++++++++++-
 .../kylin/storage/hbase/HBaseConnection.java    |  8 +++++
 .../kylin/storage/hbase/HBaseResourceStore.java |  4 +--
 .../kylin/storage/hbase/steps/BulkLoadJob.java  |  3 +-
 .../storage/hbase/steps/CreateHTableJob.java    |  2 +-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  5 +--
 .../hbase/util/DeployCoprocessorCLI.java        |  2 +-
 .../hbase/steps/ITHBaseResourceStoreTest.java   |  2 +-
 11 files changed, 64 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 8dfb05b..5b56f31 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -12,6 +12,10 @@ kylin.storage.url=hbase
 # Temp folder in hdfs, make sure user has the right access to the hdfs directory
 kylin.hdfs.working.dir=/kylin
 
+# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster:8020
+# leave empty if hbase running on same cluster with hive and mapreduce
+kylin.hbase.cluster.fs=
+
 kylin.job.mapreduce.default.reduce.input.mb=500
 
 # If true, job engine will not assume that hadoop CLI reside on the same server as it self

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 43b8c4d..376327a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -104,6 +104,8 @@ public class KylinConfig implements Serializable {
 
     public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir";
 
+    public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs";
+
     public static final String HIVE_DATABASE_FOR_INTERMEDIATE_TABLE = "kylin.job.hive.database.for.intermediatetable";
 
     public static final String HIVE_PASSWORD = "hive.password";
@@ -293,6 +295,10 @@ public class KylinConfig implements Serializable {
         return root + getMetadataUrlPrefix() + "/";
     }
 
+    public String getHBaseClusterFs() {
+        return getOptional(KYLIN_HBASE_CLUSTER_FS, "");
+    }
+
     public String getKylinJobLogDir() {
         return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs");
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index e00a693..7a2bfe5 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -102,6 +102,11 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.mrunit</groupId>
             <artifactId>mrunit</artifactId>
             <classifier>hadoop2</classifier>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
index 1c00993..7fcbf1f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
@@ -24,16 +24,25 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HadoopUtil {
+    private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class);
 
     private static ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
 
+    private static ThreadLocal<Configuration> hbaseConfig = new ThreadLocal<>();
+
     public static void setCurrentConfiguration(Configuration conf) {
         hadoopConfig.set(conf);
     }
@@ -45,6 +54,18 @@ public class HadoopUtil {
         return hadoopConfig.get();
     }
 
+    public static Configuration getCurrentHBaseConfiguration() {
+        if (hbaseConfig.get() == null) {
+            Configuration configuration = HBaseConfiguration.create(new Configuration());
+            String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs();
+            if (StringUtils.isNotEmpty(hbaseClusterFs)) {
+                configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
+            }
+            hbaseConfig.set(configuration);
+        }
+        return hbaseConfig.get();
+    }
+
     public static FileSystem getFileSystem(String path) throws IOException {
         return FileSystem.get(makeURI(path), getCurrentConfiguration());
     }
@@ -57,6 +78,15 @@ public class HadoopUtil {
         }
     }
 
+    public static String makeQualifiedPathInHBaseCluster(String path) {
+        try {
+            FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration());
+            return fs.makeQualified(new Path(path)).toString();
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e);
+        }
+    }
+
     public static String fixWindowsPath(String path) {
         // fix windows path
         if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
@@ -87,7 +117,7 @@ public class HadoopUtil {
     }
 
     public static void deletePath(Configuration conf, Path path) throws IOException {
-        FileSystem fs = FileSystem.get(conf);
+        FileSystem fs = FileSystem.get(path.toUri(), conf);
         if (fs.exists(path)) {
             fs.delete(path, true);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index d1bb216..16bb30a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -28,6 +28,7 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.StorageException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,6 +82,12 @@ public class HBaseConnection {
         conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
         conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5");
         conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
+
+        String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs();
+        if (StringUtils.isNotEmpty(hbaseClusterFs)) {
+            conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
+        }
+
         // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true");
         if (StringUtils.isEmpty(url)) {
             return conf;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 59f9e84..baec4b8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -185,7 +185,7 @@ public class HBaseResourceStore extends ResourceStore {
         byte[] value = r.getValue(B_FAMILY, B_COLUMN);
         if (value.length == 0) {
             Path redirectPath = bigCellHDFSPath(resPath);
-            Configuration hconf = HadoopUtil.getCurrentConfiguration();
+            Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
             FileSystem fileSystem = FileSystem.get(hconf);
 
             return fileSystem.open(redirectPath);
@@ -304,7 +304,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
         Path redirectPath = bigCellHDFSPath(resPath);
-        Configuration hconf = HadoopUtil.getCurrentConfiguration();
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
 
         if (fileSystem.exists(redirectPath)) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
index 2be61f4..cbddfae 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
@@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +61,7 @@ public class BulkLoadJob extends AbstractHadoopJob {
             // end with "/"
             String input = getOptionValue(OPTION_INPUT_PATH);
 
-            Configuration conf = HBaseConfiguration.create(getConf());
+            Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
             FileSystem fs = FileSystem.get(conf);
 
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 35a35c1..eb1256c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -103,7 +103,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
 
         String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
-        Configuration conf = HBaseConfiguration.create(getConf());
+        Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
 
         try {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 03b4361..dfb4f33 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -4,6 +4,7 @@ import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
@@ -129,11 +130,11 @@ public class HBaseMRSteps extends JobBuilderSupport {
     }
 
     public String getHFilePath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
+        return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/");
     }
 
     public String getRowkeyDistributionOutputPath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
+        return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 5c7d46e..21d7c38 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -108,7 +108,7 @@ public class DeployCoprocessorCLI {
 
     private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        Configuration hconf = HadoopUtil.getCurrentConfiguration();
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
 
         String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
index e1976cb..ba95176 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
@@ -80,7 +80,7 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase {
             assertEquals(content, t);
 
             Path redirectPath = ((HBaseResourceStore) store).bigCellHDFSPath(path);
-            Configuration hconf = HadoopUtil.getCurrentConfiguration();
+            Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
             FileSystem fileSystem = FileSystem.get(hconf);
             assertTrue(fileSystem.exists(redirectPath));
 


[05/11] incubator-kylin git commit: KYLIN-978 small update based on yerui’s patch

Posted by qh...@apache.org.
KYLIN-978 small update based on yerui’s patch

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/985e1fb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/985e1fb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/985e1fb2

Branch: refs/heads/KYLIN-1011
Commit: 985e1fb2047d296ac89b6cf1230915bbb13dfe99
Parents: 738422e
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 22 11:40:26 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 22 11:40:26 2015 +0800

----------------------------------------------------------------------
 .../kylin/storage/hbase/steps/HBaseMRSteps.java | 22 +++++------------
 .../steps/HDFSPathGarbageCollectionStep.java    | 26 +++++++++-----------
 2 files changed, 17 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/985e1fb2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index f9e9b15..4901512 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -153,17 +153,12 @@ public class HBaseMRSteps extends JobBuilderSupport {
 
         jobFlow.addTask(createMergeGCStep());
 
-        List<String> toDeletePathsOnHadoopCluster = new ArrayList<>();
-        toDeletePathsOnHadoopCluster.addAll(getMergingHDFSPaths());
-
-        List<String> toDeletePathsOnHbaseCluster = new ArrayList<>();
-        toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId));
-        toDeletePathsOnHbaseCluster.add(getHFilePath(jobId));
+        List<String> toDeletePaths = new ArrayList<>();
+        toDeletePaths.addAll(getMergingHDFSPaths());
 
         HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
         step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
-        step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster);
-        step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster);
+        step.setDeletePaths(toDeletePaths);
         step.setJobId(jobId);
 
         jobFlow.addTask(step);
@@ -172,17 +167,12 @@ public class HBaseMRSteps extends JobBuilderSupport {
     public void addCubingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
         String jobId = jobFlow.getId();
 
-        List<String> toDeletePathsOnHadoopCluster = new ArrayList<>();
-        toDeletePathsOnHadoopCluster.add(getFactDistinctColumnsPath(jobId));
-
-        List<String> toDeletePathsOnHbaseCluster = new ArrayList<>();
-        toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId));
-        toDeletePathsOnHbaseCluster.add(getHFilePath(jobId));
+        List<String> toDeletePaths = new ArrayList<>();
+        toDeletePaths.add(getFactDistinctColumnsPath(jobId));
 
         HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
         step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
-        step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster);
-        step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster);
+        step.setDeletePaths(toDeletePaths);
         step.setJobId(jobId);
 
         jobFlow.addTask(step);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/985e1fb2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index 2ae8ca8..f9f0b80 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -19,7 +19,6 @@ package org.apache.kylin.storage.hbase.steps;
 
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.engine.mr.HadoopUtil;
@@ -40,6 +39,7 @@ import java.util.List;
  */
 public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
 
+    public static final String TO_DELETE_PATHS = "toDeletePaths";
     private StringBuffer output;
     private JobEngineConfig config;
 
@@ -52,8 +52,12 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         try {
             config = new JobEngineConfig(context.getConfig());
-            dropHdfsPathOnCluster(getDeletePathsOnHadoopCluster(), FileSystem.get(HadoopUtil.getCurrentConfiguration()));
-            dropHdfsPathOnCluster(getDeletePathsOnHBaseCluster(), FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()));
+            List<String> toDeletePaths = getDeletePaths();
+            dropHdfsPathOnCluster(toDeletePaths, FileSystem.get(HadoopUtil.getCurrentConfiguration()));
+            
+            if (StringUtils.isNotEmpty(context.getConfig().getHBaseClusterFs())) {
+                dropHdfsPathOnCluster(toDeletePaths, FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()));
+            }
         } catch (IOException e) {
             logger.error("job:" + getId() + " execute finished with exception", e);
             output.append("\n").append(e.getLocalizedMessage());
@@ -94,24 +98,16 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
         }
     }
 
-    public void setDeletePathsOnHadoopCluster(List<String> deletePaths) {
-        setArrayParam("toDeletePathsOnHadoopCluster", deletePaths);
-    }
-
-    public void setDeletePathsOnHBaseCluster(List<String> deletePaths) {
-        setArrayParam("toDeletePathsOnHBaseCluster", deletePaths);
+    public void setDeletePaths(List<String> deletePaths) {
+        setArrayParam(TO_DELETE_PATHS, deletePaths);
     }
 
     public void setJobId(String jobId) {
         setParam("jobId", jobId);
     }
 
-    public List<String> getDeletePathsOnHadoopCluster() {
-        return getArrayParam("toDeletePathsOnHadoopCluster");
-    }
-
-    public List<String> getDeletePathsOnHBaseCluster() {
-        return getArrayParam("toDeletePathsOnHBaseCluster");
+    public List<String> getDeletePaths() {
+        return getArrayParam(TO_DELETE_PATHS);
     }
 
     public String getJobId() {


[08/11] incubator-kylin git commit: refactor

Posted by qh...@apache.org.
refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f5c55d7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f5c55d7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f5c55d7b

Branch: refs/heads/KYLIN-1011
Commit: f5c55d7b431486b785ccafb4de904d43809480b9
Parents: 55a85df
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Sep 22 16:48:39 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Sep 22 17:29:32 2015 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                |   5 -
 .../kylin/job/BuildCubeWithStreamTest.java      |  35 +++--
 .../apache/kylin/job/BuildIIWithStreamTest.java |  43 +++---
 .../java/org/apache/kylin/job/DeployUtil.java   |  32 ++---
 .../job/ITKafkaBasedIIStreamBuilderTest.java    |  85 -----------
 .../kylin/job/hadoop/invertedindex/IITest.java  |  39 ++---
 .../job/streaming/CubeStreamConsumerTest.java   |  90 ------------
 .../kylin/job/streaming/KafkaDataLoader.java    |  11 +-
 .../streaming/PeriodicalStreamBuilderTest.java  | 144 -------------------
 .../invertedindex/streaming/SliceBuilder.java   |  81 +++++++++++
 .../source/kafka/StringStreamingParser.java     |  65 +++++++++
 11 files changed, 220 insertions(+), 410 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 99557fb..9f17913 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -49,11 +49,6 @@
             <artifactId>kylin-invertedindex</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-streaming</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
         
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index b02b2f2..6bfd560 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -34,21 +34,18 @@
 
 package org.apache.kylin.job;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.job.streaming.BootstrapConfig;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
 import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
-import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.StreamingManager;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -56,6 +53,10 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
 /**
  *  for streaming cubing case "test_streaming_table"
  */
@@ -84,12 +85,14 @@ public class BuildCubeWithStreamTest {
 
         kylinConfig = KylinConfig.getInstanceFromEnv();
 
-        //Use a random toplic for kafka data stream
-        StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingName);
+        final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName);
+
+        //Use a random topic for kafka data stream
+        KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getStreamingConfig(streamingName);
         streamingConfig.setTopic(UUID.randomUUID().toString());
-        StreamingManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
+        KafkaConfigManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
 
-        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, streamingConfig);
+        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, config.getCubeName(), streamingConfig);
     }
 
     @AfterClass
@@ -120,13 +123,7 @@ public class BuildCubeWithStreamTest {
     @Test
     public void test() throws Exception {
         for (long start = startTime; start < endTime; start += batchInterval) {
-            BootstrapConfig bootstrapConfig = new BootstrapConfig();
-            bootstrapConfig.setStart(start);
-            bootstrapConfig.setEnd(start + batchInterval);
-            bootstrapConfig.setOneOff(true);
-            bootstrapConfig.setPartitionId(0);
-            bootstrapConfig.setStreaming(streamingName);
-            StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(bootstrapConfig);
+            new OneOffStreamingBuilder(streamingName, start, start + batchInterval).build().run();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 5ca3b29..89be628 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -39,11 +39,7 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -55,12 +51,16 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.invertedindex.streaming.SliceBuilder;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -69,9 +69,6 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.source.hive.HiveTableReader;
 import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -184,32 +181,30 @@ public class BuildIIWithStreamTest {
                 logger.info("measure:" + tblColRef.getName());
             }
         }
-        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>();
         final IISegment segment = createSegment(iiName);
         String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
         ToolRunner.run(new IICreateHTableJob(), args);
 
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final StreamBuilder streamBuilder = StreamBuilder.newLimitedSizeStreamBuilder(iiName, queue, new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0), 0, segment.getIIDesc().getSliceSize());
+        final IIDesc iiDesc = segment.getIIDesc();
+        final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0, iiDesc.isUseLocalDictionary());
 
         List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
         int count = sorted.size();
+        ArrayList<StreamingMessage> messages = Lists.newArrayList();
         for (String[] row : sorted) {
-            logger.info("another row: " + StringUtils.join(row, ","));
-            queue.put(parse(row));
+            if (messages.size() >= iiDesc.getSliceSize()) {
+                messages.add(parse(row));
+            } else {
+                sliceBuilder.buildSlice(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
+                messages = Lists.newArrayList();
+            }
+        }
+        if (!messages.isEmpty()) {
+            sliceBuilder.buildSlice(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
         }
 
         reader.close();
         logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
-        queue.put(StreamMessage.EOF);
-        final Future<?> future = executorService.submit(streamBuilder);
-        try {
-            future.get();
-        } catch (Exception e) {
-            logger.error("stream build failed", e);
-            fail("stream build failed");
-        }
-
         logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
     }
 
@@ -225,8 +220,8 @@ public class BuildIIWithStreamTest {
         }
     }
 
-    private StreamMessage parse(String[] row) {
-        return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
+    private StreamingMessage parse(String[] row) {
+        return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object>emptyMap());
     }
 
     private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index d47a664..9ec4c88 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,14 +18,9 @@
 
 package org.apache.kylin.job;
 
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -35,6 +30,7 @@ import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.job.dataGen.FactTableGenerator;
 import org.apache.kylin.job.streaming.KafkaDataLoader;
 import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
@@ -43,15 +39,16 @@ import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.hive.HiveClient;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.TimedJsonStreamParser;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.TimedJsonStreamParser;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.maven.model.Model;
 import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import java.io.*;
+import java.util.List;
 
 public class DeployUtil {
     private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
@@ -146,14 +143,13 @@ public class DeployUtil {
         deployHiveTables();
     }
 
-    public static void prepareTestDataForStreamingCube(long startTime, long endTime, StreamingConfig streamingConfig) throws IOException {
-        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(streamingConfig.getCubeName());
+    public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, KafkaConfig kafkaConfig) throws IOException {
+        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
         List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
         TableDesc tableDesc = cubeInstance.getFactTableDesc();
-
         //load into kafka
-        KafkaDataLoader.loadIntoKafka(streamingConfig, data);
-        logger.info("Write {} messages into topic {}", data.size(), streamingConfig.getTopic());
+        KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data);
+        logger.info("Write {} messages into topic {}", data.size(), kafkaConfig.getTopic());
 
         //csv data for H2 use
         List<TblColRef> tableColumns = Lists.newArrayList();
@@ -163,7 +159,7 @@ public class DeployUtil {
         TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
-            List<String> rowColumns = timedJsonStreamParser.parse(new StreamMessage(0, json.getBytes())).getStreamMessage();
+            List<String> rowColumns = timedJsonStreamParser.parse(new MessageAndOffset(new Message(json.getBytes()), 0)).getData();
             sb.append(StringUtils.join(rowColumns, ","));
             sb.append(System.getProperty("line.separator"));
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
deleted file mode 100644
index 6a615cb..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import java.io.File;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-@Ignore("this test case will break existing metadata store")
-public class ITKafkaBasedIIStreamBuilderTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(ITKafkaBasedIIStreamBuilderTest.class);
-
-    private KylinConfig kylinConfig;
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-    }
-
-    @Test
-    public void test() throws Exception {
-        final StreamingBootstrap bootstrap = StreamingBootstrap.getInstance(kylinConfig);
-        bootstrap.start("eagle", 0);
-        Thread.sleep(30 * 60 * 1000);
-        logger.info("time is up, stop streaming");
-        bootstrap.stop();
-        Thread.sleep(5 * 1000);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index dcd460b..913a2f7 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -10,12 +10,18 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
+import com.google.common.base.Function;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.util.FIFOIterable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.index.Slice;
@@ -26,6 +32,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
 import org.apache.kylin.invertedindex.model.IIRow;
 import org.apache.kylin.invertedindex.model.KeyValueCodec;
+import org.apache.kylin.invertedindex.streaming.SliceBuilder;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -33,6 +40,8 @@ import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.StreamingParser;
+import org.apache.kylin.source.kafka.StringStreamingParser;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
@@ -41,18 +50,11 @@ import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.ClearTextDictionar
 import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregators;
 import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint;
 import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StringStreamParser;
-import org.apache.kylin.streaming.invertedindex.SliceBuilder;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -78,24 +80,23 @@ public class IITest extends LocalFileMetadataTestCase {
         this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
         this.iiDesc = ii.getDescriptor();
 
-        List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
+        List<MessageAndOffset> messages = Lists.transform(Arrays.asList(inputData), new Function<String, MessageAndOffset>() {
             @Nullable
             @Override
-            public StreamMessage apply(String input) {
-                return new StreamMessage(System.currentTimeMillis(), input.getBytes());
+            public MessageAndOffset apply(String input) {
+                return new MessageAndOffset(new Message(input.getBytes()), System.currentTimeMillis());
             }
         });
 
-        List<List<String>> parsedStreamMessages = Lists.newArrayList();
-        StreamParser parser = StringStreamParser.instance;
-
-        MicroStreamBatch batch = new MicroStreamBatch(0);
-        for (StreamMessage message : streamMessages) {
-            ParsedStreamMessage parsedStreamMessage = parser.parse(message);
-            if ((parsedStreamMessage.isAccepted())) {
-                batch.add(parsedStreamMessage);
+        final StreamingParser parser = StringStreamingParser.instance;
+        final List<StreamingMessage> streamingMessages = Lists.transform(messages, new Function<MessageAndOffset, StreamingMessage>() {
+            @Nullable
+            @Override
+            public StreamingMessage apply(@Nullable MessageAndOffset input) {
+                return parser.parse(input);
             }
-        }
+        });
+        StreamingBatch batch = new StreamingBatch(streamingMessages, Pair.newPair(0L, System.currentTimeMillis()));
 
         iiRows = Lists.newArrayList();
         final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
deleted file mode 100644
index be4fa26..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-@Ignore
-public class CubeStreamConsumerTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumerTest.class);
-
-    private KylinConfig kylinConfig;
-
-    private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
-        // remove all existing segments
-        CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
-
-    }
-
-    @Test
-    public void test() throws Exception {
-        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
-        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
-        queues.add(queue);
-        StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
-        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
-        loadDataFromLocalFile(queue, 100000);
-        future.get();
-    }
-
-    private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
-        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
-        String line;
-        int count = 0;
-        while ((line = br.readLine()) != null && count++ < maxCount) {
-            final List<String> strings = Arrays.asList(line.split("\t"));
-            queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
-        }
-        queue.put(StreamMessage.EOF);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index 95fbc9d..c3caa9b 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -10,22 +10,21 @@ import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.streaming.BrokerConfig;
-import org.apache.kylin.streaming.KafkaClusterConfig;
-import org.apache.kylin.streaming.StreamingConfig;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
+import org.apache.kylin.source.kafka.config.BrokerConfig;
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 
 /**
  * Load prepared data into kafka(for test use)
  */
 public class KafkaDataLoader {
 
-    public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
+    public static void loadIntoKafka(List<KafkaClusterConfig> kafkaClusterConfigs, List<String> messages) {
 
-        KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
+        KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
         String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
             @Nullable
             @Override
@@ -44,7 +43,7 @@ public class KafkaDataLoader {
 
         List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
         for (int i = 0; i < messages.size(); ++i) {
-            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
+            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
             keyedMessages.add(keyedMessage);
         }
         producer.send(keyedMessages);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
deleted file mode 100644
index dc6d312..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.MicroStreamBatchConsumer;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StreamingManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(PeriodicalStreamBuilderTest.class);
-
-    @Before
-    public void setup() {
-        this.createTestMetadata();
-
-    }
-
-    @After
-    public void clear() {
-        this.cleanupTestMetadata();
-    }
-
-    private List<StreamMessage> prepareTestData(long start, long end, int count) {
-        double step = (double) (end - start) / (count - 1);
-        long ts = start;
-        int offset = 0;
-        ArrayList<StreamMessage> result = Lists.newArrayList();
-        for (int i = 0; i < count - 1; ++i) {
-            result.add(new StreamMessage(offset++, String.valueOf(ts).getBytes()));
-            ts += step;
-        }
-        result.add(new StreamMessage(offset++, String.valueOf(end).getBytes()));
-        assertEquals(count, result.size());
-        assertEquals(start + "", new String(result.get(0).getRawData()));
-        assertEquals(end + "", new String(result.get(count - 1).getRawData()));
-        return result;
-    }
-
-    @Test
-    public void test() throws ExecutionException, InterruptedException {
-
-        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
-        queues.add(new LinkedBlockingQueue<StreamMessage>());
-        queues.add(new LinkedBlockingQueue<StreamMessage>());
-
-        final long interval = 3000L;
-        final long nextPeriodStart = TimeUtil.getNextPeriodStart(System.currentTimeMillis(), interval);
-
-        final List<Integer> partitionIds = Lists.newArrayList();
-        for (int i = 0; i < queues.size(); i++) {
-            partitionIds.add(i);
-        }
-
-        final MicroStreamBatchConsumer consumer = new MicroStreamBatchConsumer() {
-            @Override
-            public void consume(MicroStreamBatch microStreamBatch) throws Exception {
-                logger.info("consuming batch:" + microStreamBatch.getPartitionId() + " count:" + microStreamBatch.size() + " timestamp:" + microStreamBatch.getTimestamp() + " offset:" + microStreamBatch.getOffset());
-            }
-
-            @Override
-            public void stop() {
-                logger.info("consumer stopped");
-            }
-        };
-        final StreamBuilder streamBuilder = StreamBuilder.newPeriodicalStreamBuilder("test", queues, consumer, nextPeriodStart, interval);
-
-        streamBuilder.setStreamParser(new StreamParser() {
-            @Override
-            public ParsedStreamMessage parse(StreamMessage streamMessage) {
-                return new ParsedStreamMessage(Collections.<String> emptyList(), streamMessage.getOffset(), Long.parseLong(new String(streamMessage.getRawData())), true);
-            }
-        });
-
-        Future<?> future = Executors.newSingleThreadExecutor().submit(streamBuilder);
-        long timeout = nextPeriodStart + interval;
-        int messageCount = 0;
-        int inPeriodMessageCount = 0;
-        int expectedOffset = 0;
-        logger.info("prepare to add StreamMessage");
-        while (true) {
-            long ts = System.currentTimeMillis();
-            if (ts >= timeout + interval) {
-                break;
-            }
-            if (ts >= nextPeriodStart && ts < timeout) {
-                inPeriodMessageCount++;
-            }
-            for (BlockingQueue<StreamMessage> queue : queues) {
-                queue.put(new StreamMessage(messageCount, String.valueOf(ts).getBytes()));
-            }
-            if (expectedOffset == 0 && ts >= timeout) {
-                expectedOffset = messageCount - 1;
-            }
-            messageCount++;
-            Thread.sleep(10);
-        }
-        logger.info("totally put " + messageCount + " StreamMessages");
-        logger.info("totally in period " + inPeriodMessageCount + " StreamMessages");
-
-        for (BlockingQueue<StreamMessage> queue : queues) {
-            queue.put(StreamMessage.EOF);
-        }
-
-        future.get();
-
-        for (BlockingQueue<StreamMessage> queue : queues) {
-            queue.take();
-        }
-
-        final Map<Integer, Long> offsets = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getOffset("test", partitionIds);
-        logger.info("offset:" + offsets);
-        for (Long offset : offsets.values()) {
-            assertEquals(expectedOffset, offset.longValue());
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
new file mode 100644
index 0000000..ba337c8
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.invertedindex.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.invertedindex.index.BatchSliceMaker;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ */
+public final class SliceBuilder {
+
+    private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
+
+    private final BatchSliceMaker sliceMaker;
+    private final IIDesc iiDesc;
+    private final boolean useLocalDict;
+
+    public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
+        this.iiDesc = desc;
+        this.sliceMaker = new BatchSliceMaker(desc, shard);
+        this.useLocalDict = useLocalDict;
+    }
+
+    public Slice buildSlice(StreamingBatch microStreamBatch) {
+        final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
+            @Nullable
+            @Override
+            public List<String> apply(@Nullable StreamingMessage input) {
+                return input.getData();
+            }
+        });
+        final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+        TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
+        return build(messages, tableRecordInfo, dictionaries);
+    }
+
+    private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
+        final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
+            @Nullable
+            @Override
+            public TableRecord apply(@Nullable List<String> input) {
+                TableRecord result = tableRecordInfo.createTableRecord();
+                for (int i = 0; i < input.size(); i++) {
+                    result.setValueString(i, input.get(i));
+                }
+                return result;
+            }
+        }));
+        slice.setLocalDictionaries(localDictionary);
+        return slice;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
new file mode 100644
index 0000000..307f73a
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.source.kafka;
+
+import com.google.common.collect.Lists;
+import kafka.message.MessageAndOffset;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+/**
+ */
+public final class StringStreamingParser implements StreamingParser {
+
+    public static final StringStreamingParser instance = new StringStreamingParser();
+
+    private StringStreamingParser() {
+    }
+
+    @Override
+    public StreamingMessage parse(MessageAndOffset kafkaMessage) {
+        final ByteBuffer payload = kafkaMessage.message().payload();
+        byte[] bytes = new byte[payload.limit()];
+        payload.get(bytes);
+        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object>emptyMap());
+    }
+
+    @Override
+    public boolean filter(StreamingMessage streamingMessage) {
+        return true;
+    }
+}