You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/25 08:11:35 UTC

[03/11] incubator-kylin git commit: KYLIN-697 non-integration tests all passed

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
new file mode 100644
index 0000000..492b9bf
--- /dev/null
+++ b/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.test;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.routing.RoutingRules.RealizationPriorityRule;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ */
+@RunWith(Parameterized.class)
+public class ITIIQueryTest extends ITKylinQueryTest {
+    @BeforeClass
+    public static void setUp() throws Exception {
+
+        // give II higher priority than other realizations
+        Map<RealizationType, Integer> priorities = Maps.newHashMap();
+        priorities.put(RealizationType.INVERTED_INDEX, 0);
+        priorities.put(RealizationType.CUBE, 1);
+        priorities.put(RealizationType.HYBRID, 1);
+        RealizationPriorityRule.setPriorities(priorities);
+
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        ITKylinQueryTest.tearDown();//invoke super class
+
+        Map<RealizationType, Integer> priorities = Maps.newHashMap();
+        priorities.put(RealizationType.INVERTED_INDEX, 1);
+        priorities.put(RealizationType.CUBE, 0);
+        priorities.put(RealizationType.HYBRID, 0);
+        RealizationPriorityRule.setPriorities(priorities);
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] { { "inner" }, { "left" } });
+    }
+
+    public ITIIQueryTest(String joinType) throws Exception {
+
+        ITKylinQueryTest.clean();
+
+        ITKylinQueryTest.joinType = joinType;
+        ITKylinQueryTest.setupAll();
+
+    }
+
+    @Test
+    public void testSingleRunQuery() throws Exception {
+        super.testSingleRunQuery();
+    }
+
+    @Test
+    public void testDetailedQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_ii", null, true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
new file mode 100644
index 0000000..5e14471
--- /dev/null
+++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.test;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.query.enumerator.OLAPQuery;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
+import org.dbunit.database.DatabaseConnection;
+import org.dbunit.database.IDatabaseConnection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.sql.DriverManager;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertTrue;
+
+@Ignore("KylinQueryTest is contained by ITCombinationTest")
+public class ITKylinQueryTest extends KylinTestBase {
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        printInfo("setUp in KylinQueryTest");
+        joinType = "left";
+
+        setupAll();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        printInfo("tearDown");
+        printInfo("Closing connection...");
+        clean();
+    }
+
+    protected static void setupAll() throws Exception {
+        //setup env
+        HBaseMetadataTestCase.staticCreateTestMetadata();
+        config = KylinConfig.getInstanceFromEnv();
+
+        //setup cube conn
+        File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config);
+        Properties props = new Properties();
+        props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "10000");
+        cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props);
+
+        //setup h2
+        h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", "");
+        // Load H2 Tables (inner join)
+        H2Database h2DB = new H2Database(h2Connection, config);
+        h2DB.loadAllTables(joinType);
+    }
+
+    protected static void clean() {
+        if (cubeConnection != null)
+            closeConnection(cubeConnection);
+        if (h2Connection != null)
+            closeConnection(h2Connection);
+
+        ObserverEnabler.forceCoprocessorUnset();
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    @Ignore("this is only for debug")
+    @Test
+    public void testTempQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/temp", null, true);
+    }
+
+    @Test
+    public void testSingleRunQuery() throws Exception {
+
+        String queryFileName = "src/test/resources/query/sql/query00.sql";
+
+        File sqlFile = new File(queryFileName);
+        if (sqlFile.exists()) {
+            runSQL(sqlFile, true, true);
+            runSQL(sqlFile, true, false);
+        }
+    }
+
+    @Test
+    public void testSingleExecuteQuery() throws Exception {
+
+        String queryFileName = "src/test/resources/query/sql/query53.sql";
+
+        File sqlFile = new File(queryFileName);
+        String sql = getTextFromFile(sqlFile);
+        IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
+
+        executeQuery(kylinConn, queryFileName, sql, true);
+    }
+
+    @Ignore
+    @Test
+    public void testTableauProbing() throws Exception {
+        batchExecuteQuery("src/test/resources/query/tableau_probing");
+    }
+
+    @Test
+    public void testCommonQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql", null, true);
+    }
+
+    @Ignore
+    @Test
+    public void testSimpleQuery() throws Exception {
+        verifyResultRowCount("src/test/resources/query/sql_verifyCount");
+    }
+
+    @Test
+    public void testOrderByQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_orderby", null, true);
+        // FIXME
+        // as of optiq 0.8, we lost metadata type with "order by" clause, e.g. sql_orderby/query01.sql
+        // thus, temporarily the "order by" clause was cross out, and the needSort is set to true
+        // execAndCompQuery("src/test/resources/query/sql_orderby", null, false);
+    }
+
+    @Test
+    public void testLookupQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_lookup", null, true);
+    }
+
+    @Test
+    public void testCachedQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_cache", null, true);
+    }
+
+    @Test
+    public void testDerivedColumnQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_derived", null, true);
+    }
+
+    @Test
+    public void testDistinctCountQuery() throws Exception {
+        batchExecuteQuery("src/test/resources/query/sql_distinct");
+    }
+
+    @Test
+    public void testTableauQuery() throws Exception {
+        batchExecuteQuery("src/test/resources/query/sql_tableau");
+    }
+
+    @Test
+    public void testSubQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_subquery", null, true);
+    }
+
+    @Test
+    public void testCaseWhen() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_casewhen", null, true);
+    }
+
+    @Ignore
+    @Test
+    public void testHiveQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_hive", null, true);
+    }
+
+    @Test
+    public void testH2Query() throws Exception {
+        this.execQueryUsingH2("src/test/resources/query/sql_orderby", false);
+    }
+
+    @Test
+    public void testInvalidQuery() throws Exception {
+
+        printInfo("-------------------- Test Invalid Query --------------------");
+        String queryFolder = "src/test/resources/query/sql_invalid";
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), ".sql");
+        for (File sqlFile : sqlFiles) {
+            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+            printInfo("Testing Query " + queryName);
+            String sql = getTextFromFile(sqlFile);
+            IDatabaseConnection cubeConn = new DatabaseConnection(cubeConnection);
+            try {
+                cubeConn.createQueryTable(queryName, sql);
+            } catch (Throwable t) {
+                continue;
+            } finally {
+                cubeConn.close();
+            }
+            throw new IllegalStateException(queryName + " should be error!");
+        }
+    }
+
+    @Test
+    public void testDynamicQuery() throws Exception {
+        execAndCompDynamicQuery("src/test/resources/query/sql_dynamic", null, true);
+    }
+
+    @Ignore("simple query will be supported by ii")
+    @Test
+    public void testLimitEnabled() throws Exception {
+        runSqlFile("src/test/resources/query/sql_optimize/enable-limit01.sql");
+        assertLimitWasEnabled();
+    }
+
+    private void assertLimitWasEnabled() {
+        OLAPContext context = getFirstOLAPContext();
+        assertTrue(context.storageContext.isLimitEnabled());
+    }
+
+    private OLAPContext getFirstOLAPContext() {
+        return OLAPContext.getThreadLocalContexts().iterator().next();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
deleted file mode 100644
index cea8ee2..0000000
--- a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.test;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.query.enumerator.OLAPQuery;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.schema.OLAPSchemaFactory;
-import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
-import org.dbunit.database.DatabaseConnection;
-import org.dbunit.database.IDatabaseConnection;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.File;
-import java.sql.DriverManager;
-import java.util.List;
-import java.util.Properties;
-
-import static org.junit.Assert.assertTrue;
-
-@Ignore("KylinQueryTest is contained by CombinationTest")
-public class KylinQueryTest extends KylinTestBase {
-
-    @BeforeClass
-    public static void setUp() throws Exception {
-        printInfo("setUp in KylinQueryTest");
-        joinType = "left";
-
-        setupAll();
-    }
-
-    @AfterClass
-    public static void tearDown() throws Exception {
-        printInfo("tearDown");
-        printInfo("Closing connection...");
-        clean();
-    }
-
-    protected static void setupAll() throws Exception {
-        //setup env
-        HBaseMetadataTestCase.staticCreateTestMetadata();
-        config = KylinConfig.getInstanceFromEnv();
-
-        //setup cube conn
-        File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config);
-        Properties props = new Properties();
-        props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "10000");
-        cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props);
-
-        //setup h2
-        h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", "");
-        // Load H2 Tables (inner join)
-        H2Database h2DB = new H2Database(h2Connection, config);
-        h2DB.loadAllTables(joinType);
-    }
-
-    protected static void clean() {
-        if (cubeConnection != null)
-            closeConnection(cubeConnection);
-        if (h2Connection != null)
-            closeConnection(h2Connection);
-
-        ObserverEnabler.forceCoprocessorUnset();
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
-    }
-
-    @Ignore("this is only for debug")
-    @Test
-    public void testTempQuery() throws Exception {
-        execAndCompQuery("src/test/resources/query/temp", null, true);
-    }
-
-    @Test
-    public void testSingleRunQuery() throws Exception {
-
-        String queryFileName = "src/test/resources/query/sql/query00.sql";
-
-        File sqlFile = new File(queryFileName);
-        if (sqlFile.exists()) {
-            runSQL(sqlFile, true, true);
-            runSQL(sqlFile, true, false);
-        }
-    }
-
-    @Test
-    public void testSingleExecuteQuery() throws Exception {
-
-        String queryFileName = "src/test/resources/query/sql/query53.sql";
-
-        File sqlFile = new File(queryFileName);
-        String sql = getTextFromFile(sqlFile);
-        IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-
-        executeQuery(kylinConn, queryFileName, sql, true);
-    }
-
-    @Ignore
-    @Test
-    public void testTableauProbing() throws Exception {
-        batchExecuteQuery("src/test/resources/query/tableau_probing");
-    }
-
-    @Test
-    public void testCommonQuery() throws Exception {
-        execAndCompQuery("src/test/resources/query/sql", null, true);
-    }
-
-    @Ignore
-    @Test
-    public void testSimpleQuery() throws Exception {
-        verifyResultRowCount("src/test/resources/query/sql_verifyCount");
-    }
-
-    @Test
-    public void testOrderByQuery() throws Exception {
-        execAndCompQuery("src/test/resources/query/sql_orderby", null, true);
-        // FIXME
-        // as of optiq 0.8, we lost metadata type with "order by" clause, e.g. sql_orderby/query01.sql
-        // thus, temporarily the "order by" clause was cross out, and the needSort is set to true
-        // execAndCompQuery("src/test/resources/query/sql_orderby", null, false);
-    }
-
-    @Test
-    public void testLookupQuery() throws Exception {
-        execAndCompQuery("src/test/resources/query/sql_lookup", null, true);
-    }
-
-    @Test
-    public void testCachedQuery() throws Exception {
-        execAndCompQuery("src/test/resources/query/sql_cache", null, true);
-    }
-
-    @Test
-    public void testDerivedColumnQuery() throws Exception {
-        execAndCompQuery("src/test/resources/query/sql_derived", null, true);
-    }
-
-    @Test
-    public void testDistinctCountQuery() throws Exception {
-        batchExecuteQuery("src/test/resources/query/sql_distinct");
-    }
-
-    @Test
-    public void testTableauQuery() throws Exception {
-        batchExecuteQuery("src/test/resources/query/sql_tableau");
-    }
-
-    @Test
-    public void testSubQuery() throws Exception {
-        execAndCompQuery("src/test/resources/query/sql_subquery", null, true);
-    }
-
-    @Test
-    public void testCaseWhen() throws Exception {
-        execAndCompQuery("src/test/resources/query/sql_casewhen", null, true);
-    }
-
-    @Ignore
-    @Test
-    public void testHiveQuery() throws Exception {
-        execAndCompQuery("src/test/resources/query/sql_hive", null, true);
-    }
-
-    @Test
-    public void testH2Query() throws Exception {
-        this.execQueryUsingH2("src/test/resources/query/sql_orderby", false);
-    }
-
-    @Test
-    public void testInvalidQuery() throws Exception {
-
-        printInfo("-------------------- Test Invalid Query --------------------");
-        String queryFolder = "src/test/resources/query/sql_invalid";
-        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), ".sql");
-        for (File sqlFile : sqlFiles) {
-            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
-            printInfo("Testing Query " + queryName);
-            String sql = getTextFromFile(sqlFile);
-            IDatabaseConnection cubeConn = new DatabaseConnection(cubeConnection);
-            try {
-                cubeConn.createQueryTable(queryName, sql);
-            } catch (Throwable t) {
-                continue;
-            } finally {
-                cubeConn.close();
-            }
-            throw new IllegalStateException(queryName + " should be error!");
-        }
-    }
-
-    @Test
-    public void testDynamicQuery() throws Exception {
-        execAndCompDynamicQuery("src/test/resources/query/sql_dynamic", null, true);
-    }
-
-    @Ignore("simple query will be supported by ii")
-    @Test
-    public void testLimitEnabled() throws Exception {
-        runSqlFile("src/test/resources/query/sql_optimize/enable-limit01.sql");
-        assertLimitWasEnabled();
-    }
-
-    private void assertLimitWasEnabled() {
-        OLAPContext context = getFirstOLAPContext();
-        assertTrue(context.storageContext.isLimitEnabled());
-    }
-
-    private OLAPContext getFirstOLAPContext() {
-        return OLAPContext.getThreadLocalContexts().iterator().next();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java b/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
new file mode 100644
index 0000000..9bdb510
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
@@ -0,0 +1,245 @@
+package org.apache.kylin.jdbc;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.webapp.WebAppContext;
+import org.junit.*;
+
+import java.io.File;
+import java.sql.*;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ */
+public class ITJDBCDriverTest extends HBaseMetadataTestCase {
+
+    private static Server server = null;
+
+    private static String previousSpringProfile = null;
+
+    private static String SPRING_PROFILE_PROPERTY = "spring.profiles.active";
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        previousSpringProfile = System.getProperty(SPRING_PROFILE_PROPERTY);
+        System.setProperty(SPRING_PROFILE_PROPERTY, "testing");
+        staticCreateTestMetadata();
+        startJetty();
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        stopJetty();
+        staticCleanupTestMetadata();
+        if (previousSpringProfile == null) {
+            System.clearProperty(SPRING_PROFILE_PROPERTY);
+        } else {
+            System.setProperty(SPRING_PROFILE_PROPERTY, previousSpringProfile);
+        }
+    }
+
+    protected static void stopJetty() throws Exception {
+        if (server != null)
+            server.stop();
+
+        File workFolder = new File("work");
+        if(workFolder.isDirectory() && workFolder.exists()) {
+            FileUtils.deleteDirectory(workFolder);
+        }
+    }
+
+    protected static void startJetty() throws Exception {
+
+        server = new Server(7070);
+
+        WebAppContext context = new WebAppContext();
+        context.setDescriptor("./src/main/webapp/WEB-INF/web.xml");
+        context.setResourceBase("./src/main/webapp");
+        context.setContextPath("/kylin");
+        context.setParentLoaderPriority(true);
+
+        server.setHandler(context);
+
+        server.start();
+
+    }
+
+    protected Connection getConnection() throws Exception {
+
+        Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
+        Properties info = new Properties();
+        info.put("user", "ADMIN");
+        info.put("password", "KYLIN");
+        Connection conn = driver.connect("jdbc:kylin://localhost:7070/default", info);
+
+        return conn;
+    }
+
+    @Test
+    public void testMetadata1() throws Exception {
+
+        // check the JDBC API here: http://docs.oracle.com/javase/7/docs/api/java/sql/DatabaseMetaData.html
+        Connection conn = getConnection();
+
+        // test getSchemas();
+        List<String> schemaList = Lists.newArrayList();
+        DatabaseMetaData dbMetadata = conn.getMetaData();
+        ResultSet resultSet = dbMetadata.getSchemas();
+        while (resultSet.next()) {
+            String schema = resultSet.getString("TABLE_SCHEM");
+            String catalog = resultSet.getString("TABLE_CATALOG");
+
+            System.out.println("Get schema: schema=" + schema + ", catalog=" + catalog);
+            schemaList.add(schema);
+
+        }
+
+        resultSet.close();
+        Assert.assertTrue(schemaList.contains("DEFAULT"));
+        Assert.assertTrue(schemaList.contains("EDW"));
+
+        // test getCatalogs();
+        resultSet = dbMetadata.getCatalogs();
+
+        List<String> catalogList = Lists.newArrayList();
+        while (resultSet.next()) {
+            String catalog = resultSet.getString("TABLE_CAT");
+
+            System.out.println("Get catalog: catalog=" + catalog);
+            catalogList.add(catalog);
+
+        }
+        Assert.assertTrue(catalogList.size() > 0 && catalogList.contains("defaultCatalog"));
+
+        /** //Disable the test on getTableTypes() as it is not ready
+         resultSet = dbMetadata.getTableTypes();
+
+         List<String> tableTypes = Lists.newArrayList();
+         while (resultSet.next()) {
+         String type = resultSet.getString("TABLE_TYPE");
+
+         System.out.println("Get table type: type=" + type);
+         tableTypes.add(type);
+
+         }
+
+         Assert.assertTrue(tableTypes.size() > 0 && tableTypes.contains("TABLE"));
+         resultSet.close();
+
+         **/
+        conn.close();
+    }
+
+    @Test
+    public void testMetadata2() throws Exception {
+        Connection conn = getConnection();
+
+        List<String> tableList = Lists.newArrayList();
+        DatabaseMetaData dbMetadata = conn.getMetaData();
+        ResultSet resultSet = dbMetadata.getTables(null, "%", "%", new String[]{"TABLE"});
+        while (resultSet.next()) {
+            String schema = resultSet.getString("TABLE_SCHEM");
+            String name = resultSet.getString("TABLE_NAME");
+
+            System.out.println("Get table: schema=" + schema + ", name=" + name);
+            tableList.add(schema + "." + name);
+
+        }
+
+        resultSet.close();
+        Assert.assertTrue(tableList.contains("DEFAULT.TEST_KYLIN_FACT"));
+
+        resultSet = dbMetadata.getColumns(null, "%", "TEST_KYLIN_FACT", "%");
+
+        List<String> columns = Lists.newArrayList();
+        while (resultSet.next()) {
+            String name = resultSet.getString("COLUMN_NAME");
+            String type = resultSet.getString("TYPE_NAME");
+
+            System.out.println("Get column: name=" + name + ", data_type=" + type);
+            columns.add(name);
+
+        }
+
+        Assert.assertTrue(columns.size() > 0 && columns.contains("CAL_DT"));
+        resultSet.close();
+        conn.close();
+    }
+
+    @Test
+    public void testSimpleStatement() throws Exception {
+        Connection conn = getConnection();
+        Statement statement = conn.createStatement();
+
+        statement.execute("select count(*) from test_kylin_fact");
+
+        ResultSet rs = statement.getResultSet();
+
+        Assert.assertTrue(rs.next());
+        int result = rs.getInt(1);
+
+        Assert.assertTrue(result > 0);
+
+        rs.close();
+        statement.close();
+        conn.close();
+
+    }
+
+
+    @Test
+    public void testPreparedStatement() throws Exception {
+        Connection conn = getConnection();
+
+        PreparedStatement statement = conn.prepareStatement("select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact " +
+                "where LSTG_FORMAT_NAME = ? group by LSTG_FORMAT_NAME");
+
+        statement.setString(1, "FP-GTC");
+
+        ResultSet rs = statement.executeQuery();
+
+        Assert.assertTrue(rs.next());
+
+        String format_name = rs.getString(1);
+
+        Assert.assertTrue("FP-GTC".equals(format_name));
+
+        rs.close();
+        statement.close();
+        conn.close();
+
+    }
+
+    @Test
+    public void testResultSet() throws Exception {
+        String sql = "select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact \n" +
+                " group by LSTG_FORMAT_NAME ";
+
+        Connection conn = getConnection();
+        Statement statement = conn.createStatement();
+
+        statement.execute(sql);
+
+        ResultSet rs = statement.getResultSet();
+
+        int count = 0;
+        while (rs.next()) {
+            count++;
+            String lstg = rs.getString(1);
+            double gmv = rs.getDouble(2);
+            int trans_count = rs.getInt(3);
+
+            System.out.println("Get a line: LSTG_FORMAT_NAME=" + lstg + ", GMV=" + gmv + ", TRANS_CNT=" + trans_count);
+        }
+
+        Assert.assertTrue(count > 0);
+        statement.close();
+        rs.close();
+        conn.close();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/server/src/test/java/org/apache/kylin/jdbc/JDBCDriverTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/jdbc/JDBCDriverTest.java b/server/src/test/java/org/apache/kylin/jdbc/JDBCDriverTest.java
deleted file mode 100644
index 2eff08f..0000000
--- a/server/src/test/java/org/apache/kylin/jdbc/JDBCDriverTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-package org.apache.kylin.jdbc;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.FileUtils;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.webapp.WebAppContext;
-import org.junit.*;
-
-import java.io.File;
-import java.sql.*;
-import java.util.List;
-import java.util.Properties;
-
-/**
- */
-public class JDBCDriverTest extends HBaseMetadataTestCase {
-
-    private static Server server = null;
-
-    private static String previousSpringProfile = null;
-
-    private static String SPRING_PROFILE_PROPERTY = "spring.profiles.active";
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        previousSpringProfile = System.getProperty(SPRING_PROFILE_PROPERTY);
-        System.setProperty(SPRING_PROFILE_PROPERTY, "testing");
-        staticCreateTestMetadata();
-        startJetty();
-    }
-
-    @AfterClass
-    public static void afterClass() throws Exception {
-        stopJetty();
-        staticCleanupTestMetadata();
-        if (previousSpringProfile == null) {
-            System.clearProperty(SPRING_PROFILE_PROPERTY);
-        } else {
-            System.setProperty(SPRING_PROFILE_PROPERTY, previousSpringProfile);
-        }
-    }
-
-    protected static void stopJetty() throws Exception {
-        if (server != null)
-            server.stop();
-
-        File workFolder = new File("work");
-        if(workFolder.isDirectory() && workFolder.exists()) {
-            FileUtils.deleteDirectory(workFolder);
-        }
-    }
-
-    protected static void startJetty() throws Exception {
-
-        server = new Server(7070);
-
-        WebAppContext context = new WebAppContext();
-        context.setDescriptor("./src/main/webapp/WEB-INF/web.xml");
-        context.setResourceBase("./src/main/webapp");
-        context.setContextPath("/kylin");
-        context.setParentLoaderPriority(true);
-
-        server.setHandler(context);
-
-        server.start();
-
-    }
-
-    protected Connection getConnection() throws Exception {
-
-        Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
-        Properties info = new Properties();
-        info.put("user", "ADMIN");
-        info.put("password", "KYLIN");
-        Connection conn = driver.connect("jdbc:kylin://localhost:7070/default", info);
-
-        return conn;
-    }
-
-    @Test
-    public void testMetadata1() throws Exception {
-
-        // check the JDBC API here: http://docs.oracle.com/javase/7/docs/api/java/sql/DatabaseMetaData.html
-        Connection conn = getConnection();
-
-        // test getSchemas();
-        List<String> schemaList = Lists.newArrayList();
-        DatabaseMetaData dbMetadata = conn.getMetaData();
-        ResultSet resultSet = dbMetadata.getSchemas();
-        while (resultSet.next()) {
-            String schema = resultSet.getString("TABLE_SCHEM");
-            String catalog = resultSet.getString("TABLE_CATALOG");
-
-            System.out.println("Get schema: schema=" + schema + ", catalog=" + catalog);
-            schemaList.add(schema);
-
-        }
-
-        resultSet.close();
-        Assert.assertTrue(schemaList.contains("DEFAULT"));
-        Assert.assertTrue(schemaList.contains("EDW"));
-
-        // test getCatalogs();
-        resultSet = dbMetadata.getCatalogs();
-
-        List<String> catalogList = Lists.newArrayList();
-        while (resultSet.next()) {
-            String catalog = resultSet.getString("TABLE_CAT");
-
-            System.out.println("Get catalog: catalog=" + catalog);
-            catalogList.add(catalog);
-
-        }
-        Assert.assertTrue(catalogList.size() > 0 && catalogList.contains("defaultCatalog"));
-
-        /** //Disable the test on getTableTypes() as it is not ready
-         resultSet = dbMetadata.getTableTypes();
-
-         List<String> tableTypes = Lists.newArrayList();
-         while (resultSet.next()) {
-         String type = resultSet.getString("TABLE_TYPE");
-
-         System.out.println("Get table type: type=" + type);
-         tableTypes.add(type);
-
-         }
-
-         Assert.assertTrue(tableTypes.size() > 0 && tableTypes.contains("TABLE"));
-         resultSet.close();
-
-         **/
-        conn.close();
-    }
-
-    @Test
-    public void testMetadata2() throws Exception {
-        Connection conn = getConnection();
-
-        List<String> tableList = Lists.newArrayList();
-        DatabaseMetaData dbMetadata = conn.getMetaData();
-        ResultSet resultSet = dbMetadata.getTables(null, "%", "%", new String[]{"TABLE"});
-        while (resultSet.next()) {
-            String schema = resultSet.getString("TABLE_SCHEM");
-            String name = resultSet.getString("TABLE_NAME");
-
-            System.out.println("Get table: schema=" + schema + ", name=" + name);
-            tableList.add(schema + "." + name);
-
-        }
-
-        resultSet.close();
-        Assert.assertTrue(tableList.contains("DEFAULT.TEST_KYLIN_FACT"));
-
-        resultSet = dbMetadata.getColumns(null, "%", "TEST_KYLIN_FACT", "%");
-
-        List<String> columns = Lists.newArrayList();
-        while (resultSet.next()) {
-            String name = resultSet.getString("COLUMN_NAME");
-            String type = resultSet.getString("TYPE_NAME");
-
-            System.out.println("Get column: name=" + name + ", data_type=" + type);
-            columns.add(name);
-
-        }
-
-        Assert.assertTrue(columns.size() > 0 && columns.contains("CAL_DT"));
-        resultSet.close();
-        conn.close();
-    }
-
-    @Test
-    public void testSimpleStatement() throws Exception {
-        Connection conn = getConnection();
-        Statement statement = conn.createStatement();
-
-        statement.execute("select count(*) from test_kylin_fact");
-
-        ResultSet rs = statement.getResultSet();
-
-        Assert.assertTrue(rs.next());
-        int result = rs.getInt(1);
-
-        Assert.assertTrue(result > 0);
-
-        rs.close();
-        statement.close();
-        conn.close();
-
-    }
-
-
-    @Test
-    public void testPreparedStatement() throws Exception {
-        Connection conn = getConnection();
-
-        PreparedStatement statement = conn.prepareStatement("select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact " +
-                "where LSTG_FORMAT_NAME = ? group by LSTG_FORMAT_NAME");
-
-        statement.setString(1, "FP-GTC");
-
-        ResultSet rs = statement.executeQuery();
-
-        Assert.assertTrue(rs.next());
-
-        String format_name = rs.getString(1);
-
-        Assert.assertTrue("FP-GTC".equals(format_name));
-
-        rs.close();
-        statement.close();
-        conn.close();
-
-    }
-
-    @Test
-    public void testResultSet() throws Exception {
-        String sql = "select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact \n" +
-                " group by LSTG_FORMAT_NAME ";
-
-        Connection conn = getConnection();
-        Statement statement = conn.createStatement();
-
-        statement.execute(sql);
-
-        ResultSet rs = statement.getResultSet();
-
-        int count = 0;
-        while (rs.next()) {
-            count++;
-            String lstg = rs.getString(1);
-            double gmv = rs.getDouble(2);
-            int trans_count = rs.getInt(3);
-
-            System.out.println("Get a line: LSTG_FORMAT_NAME=" + lstg + ", GMV=" + gmv + ", TRANS_CNT=" + trans_count);
-        }
-
-        Assert.assertTrue(count > 0);
-        statement.close();
-        rs.close();
-        conn.close();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/server/src/test/java/org/apache/kylin/rest/controller/ITTableControllerTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/ITTableControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/ITTableControllerTest.java
new file mode 100644
index 0000000..75defbf
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/controller/ITTableControllerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.controller;
+
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.rest.service.CubeService;
+import org.apache.kylin.rest.service.ServiceTestBase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author shaoshi
+ */
+public class ITTableControllerTest extends ServiceTestBase {
+
+    private TableController tableController;
+    private CubeDescController cubeDescController;
+
+    @Autowired
+    CubeService cubeService;
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+
+        tableController = new TableController();
+        tableController.setCubeService(cubeService);
+    }
+
+    @Test
+    public void testBasics() throws IOException {
+        List<TableDesc> tables = tableController.getHiveTables(true, "default");
+
+        Assert.assertTrue(tables != null && tables.size() > 0);
+
+        TableDesc factTable = null;
+        for (TableDesc t : tables) {
+            if (t.getName().equalsIgnoreCase("test_kylin_fact")) {
+                factTable = t;
+                break;
+            }
+        }
+
+        Assert.assertNotNull(factTable);
+
+
+        Map<String, String[]> loadResult = tableController.loadHiveTable("test_kylin_fact,TEST_CATEGORY_GROUPINGS", "default");
+        Assert.assertNotNull(loadResult);
+
+        Assert.assertTrue(loadResult.get("result.loaded").length ==2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/server/src/test/java/org/apache/kylin/rest/controller/TableControllerTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/TableControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/TableControllerTest.java
deleted file mode 100644
index 4437abe..0000000
--- a/server/src/test/java/org/apache/kylin/rest/controller/TableControllerTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.rest.controller;
-
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.rest.service.CubeService;
-import org.apache.kylin.rest.service.ServiceTestBase;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author shaoshi
- */
-public class TableControllerTest extends ServiceTestBase {
-
-    private TableController tableController;
-    private CubeDescController cubeDescController;
-
-    @Autowired
-    CubeService cubeService;
-
-    @Before
-    public void setup() throws Exception {
-        super.setup();
-
-        tableController = new TableController();
-        tableController.setCubeService(cubeService);
-    }
-
-    @Test
-    public void testBasics() throws IOException {
-        List<TableDesc> tables = tableController.getHiveTables(true, "default");
-
-        Assert.assertTrue(tables != null && tables.size() > 0);
-
-        TableDesc factTable = null;
-        for (TableDesc t : tables) {
-            if (t.getName().equalsIgnoreCase("test_kylin_fact")) {
-                factTable = t;
-                break;
-            }
-        }
-
-        Assert.assertNotNull(factTable);
-
-
-        Map<String, String[]> loadResult = tableController.loadHiveTable("test_kylin_fact,TEST_CATEGORY_GROUPINGS", "default");
-        Assert.assertNotNull(loadResult);
-
-        Assert.assertTrue(loadResult.get("result.loaded").length ==2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 559ec89..9f0598d 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -129,6 +129,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
 
     @After
     public void after() throws Exception {
+        cleanupTestMetadata();
     }
 
     private void waitForCounterAndClear(long count) {
@@ -145,16 +146,6 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
         }
     }
 
-    @BeforeClass
-    public static void startServer() throws Exception {
-
-    }
-
-    @AfterClass
-    public static void stopServer() throws Exception {
-
-    }
-
     private static CubeManager getCubeManager(KylinConfig config) throws Exception {
         return CubeManager.getInstance(config);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/storage/src/test/java/org/apache/kylin/storage/hbase/ITInvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/ITInvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/ITInvertedIndexHBaseTest.java
new file mode 100644
index 0000000..c92921b
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/ITInvertedIndexHBaseTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * @author yangli9
+ */
+public class ITInvertedIndexHBaseTest extends HBaseMetadataTestCase {
+
+    IIInstance ii;
+    IISegment seg;
+    HConnection hconn;
+
+    TableRecordInfo info;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+
+        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        this.seg = ii.getFirstSegment();
+
+        String hbaseUrl = KylinConfig.getInstanceFromEnv().getStorageUrl();
+        Configuration hconf = HadoopUtil.newHBaseConfiguration(hbaseUrl);
+        hconn = HConnectionManager.createConnection(hconf);
+
+        this.info = new TableRecordInfo(seg);
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testLoad() throws Exception {
+
+        String tableName = seg.getStorageLocationIdentifier();
+        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+
+        List<Slice> slices = Lists.newArrayList();
+        HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES);
+        try {
+            for (Slice slice : codec.decodeKeyValue(kvIterator)) {
+                slices.add(slice);
+            }
+        } finally {
+            kvIterator.close();
+        }
+
+        List<TableRecord> records = iterateRecords(slices);
+        //dump(records);
+        System.out.println(records.size() + " records");
+    }
+
+    private List<TableRecord> iterateRecords(List<Slice> slices) {
+        List<TableRecord> records = Lists.newArrayList();
+        for (Slice slice : slices) {
+            for (RawTableRecord rec : slice) {
+                records.add(new TableRecord((RawTableRecord) rec.clone(), info));
+            }
+        }
+        return records;
+    }
+
+    private void dump(Iterable<TableRecord> records) {
+        for (TableRecord rec : records) {
+            byte[] x = rec.getBytes();
+            String y = BytesUtil.toReadableText(x);
+            System.out.println(y);
+            System.out.println();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
deleted file mode 100644
index fd24487..0000000
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
-
-    IIInstance ii;
-    IISegment seg;
-    HConnection hconn;
-
-    TableRecordInfo info;
-
-    @Before
-    public void setup() throws Exception {
-        this.createTestMetadata();
-
-        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
-        this.seg = ii.getFirstSegment();
-
-        String hbaseUrl = KylinConfig.getInstanceFromEnv().getStorageUrl();
-        Configuration hconf = HadoopUtil.newHBaseConfiguration(hbaseUrl);
-        hconn = HConnectionManager.createConnection(hconf);
-
-        this.info = new TableRecordInfo(seg);
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    @Test
-    public void testLoad() throws Exception {
-
-        String tableName = seg.getStorageLocationIdentifier();
-        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
-
-        List<Slice> slices = Lists.newArrayList();
-        HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES);
-        try {
-            for (Slice slice : codec.decodeKeyValue(kvIterator)) {
-                slices.add(slice);
-            }
-        } finally {
-            kvIterator.close();
-        }
-
-        List<TableRecord> records = iterateRecords(slices);
-        //dump(records);
-        System.out.println(records.size() + " records");
-    }
-
-    private List<TableRecord> iterateRecords(List<Slice> slices) {
-        List<TableRecord> records = Lists.newArrayList();
-        for (Slice slice : slices) {
-            for (RawTableRecord rec : slice) {
-                records.add(new TableRecord((RawTableRecord) rec.clone(), info));
-            }
-        }
-        return records;
-    }
-
-    private void dump(Iterable<TableRecord> records) {
-        for (TableRecord rec : records) {
-            byte[] x = rec.getBytes();
-            String y = BytesUtil.toReadableText(x);
-            System.out.println(y);
-            System.out.println();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
new file mode 100644
index 0000000..1e32ef5
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.test;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.StorageEngineFactory;
+import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
+import org.junit.*;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class ITStorageTest extends HBaseMetadataTestCase {
+
+    private IStorageEngine storageEngine;
+    private CubeInstance cube;
+    private StorageContext context;
+
+    @BeforeClass
+    public static void setupResource() throws Exception {
+    }
+
+    @AfterClass
+    public static void tearDownResource() {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+
+        CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
+        cube = cubeMgr.getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_EMPTY");
+        Assert.assertNotNull(cube);
+        storageEngine = StorageEngineFactory.getStorageEngine(cube);
+        String url = KylinConfig.getInstanceFromEnv().getStorageUrl();
+        context = new StorageContext();
+        context.setConnUrl(url);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test(expected = ScanOutOfLimitException.class)
+    @Ignore
+    public void testScanOutOfLimit() {
+        context.setThreshold(1);
+        List<TblColRef> groups = StorageMockUtils.buildGroups();
+        List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
+
+        search(groups, aggregations, null, context);
+    }
+
+    @Test
+    public void test01() {
+        List<TblColRef> groups = StorageMockUtils.buildGroups();
+        List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
+        TupleFilter filter = StorageMockUtils.buildFilter1(groups.get(0));
+
+        int count = search(groups, aggregations, filter, context);
+        assertTrue(count > 0);
+    }
+
+    /*
+        @Test
+        public void test02() {
+            List<TblColRef> groups = buildGroups();
+            List<FunctionDesc> aggregations = buildAggregations();
+            TupleFilter filter = buildFilter2(groups.get(1));
+
+            int count = search(groups, aggregations, filter, context);
+            assertTrue(count > 0);
+        }
+
+        @Test
+        public void test03() {
+            List<TblColRef> groups = buildGroups();
+            List<FunctionDesc> aggregations = buildAggregations();
+            TupleFilter filter = buildAndFilter(groups);
+
+            int count = search(groups, aggregations, filter, context);
+            assertTrue(count > 0);
+        }
+
+        @Test
+        public void test04() {
+            List<TblColRef> groups = buildGroups();
+            List<FunctionDesc> aggregations = buildAggregations();
+            TupleFilter filter = buildOrFilter(groups);
+
+            int count = search(groups, aggregations, filter, context);
+            assertTrue(count > 0);
+        }
+
+        @Test
+        public void test05() {
+            List<TblColRef> groups = buildGroups();
+            List<FunctionDesc> aggregations = buildAggregations();
+
+            int count = search(groups, aggregations, null, context);
+            assertTrue(count > 0);
+        }
+    */
+    private int search(List<TblColRef> groups, List<FunctionDesc> aggregations, TupleFilter filter, StorageContext context) {
+        int count = 0;
+        ITupleIterator iterator = null;
+        try {
+            SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
+            iterator = storageEngine.search(context, sqlDigest, StorageMockUtils.newTupleInfo(groups, aggregations));
+            while (iterator.hasNext()) {
+                ITuple tuple = iterator.next();
+                System.out.println("Tuple = " + tuple);
+                count++;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (iterator != null)
+                iterator.close();
+        }
+        return count;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java b/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java
deleted file mode 100644
index f9a6212..0000000
--- a/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.test;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.IStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.StorageEngineFactory;
-import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
-import org.junit.*;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
-public class StorageTest extends HBaseMetadataTestCase {
-
-    private IStorageEngine storageEngine;
-    private CubeInstance cube;
-    private StorageContext context;
-
-    @BeforeClass
-    public static void setupResource() throws Exception {
-    }
-
-    @AfterClass
-    public static void tearDownResource() {
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        this.createTestMetadata();
-
-        CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
-        cube = cubeMgr.getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_EMPTY");
-        Assert.assertNotNull(cube);
-        storageEngine = StorageEngineFactory.getStorageEngine(cube);
-        String url = KylinConfig.getInstanceFromEnv().getStorageUrl();
-        context = new StorageContext();
-        context.setConnUrl(url);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    @Test(expected = ScanOutOfLimitException.class)
-    @Ignore
-    public void testScanOutOfLimit() {
-        context.setThreshold(1);
-        List<TblColRef> groups = StorageMockUtils.buildGroups();
-        List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
-
-        search(groups, aggregations, null, context);
-    }
-
-    @Test
-    public void test01() {
-        List<TblColRef> groups = StorageMockUtils.buildGroups();
-        List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
-        TupleFilter filter = StorageMockUtils.buildFilter1(groups.get(0));
-
-        int count = search(groups, aggregations, filter, context);
-        assertTrue(count > 0);
-    }
-
-    /*
-        @Test
-        public void test02() {
-            List<TblColRef> groups = buildGroups();
-            List<FunctionDesc> aggregations = buildAggregations();
-            TupleFilter filter = buildFilter2(groups.get(1));
-
-            int count = search(groups, aggregations, filter, context);
-            assertTrue(count > 0);
-        }
-
-        @Test
-        public void test03() {
-            List<TblColRef> groups = buildGroups();
-            List<FunctionDesc> aggregations = buildAggregations();
-            TupleFilter filter = buildAndFilter(groups);
-
-            int count = search(groups, aggregations, filter, context);
-            assertTrue(count > 0);
-        }
-
-        @Test
-        public void test04() {
-            List<TblColRef> groups = buildGroups();
-            List<FunctionDesc> aggregations = buildAggregations();
-            TupleFilter filter = buildOrFilter(groups);
-
-            int count = search(groups, aggregations, filter, context);
-            assertTrue(count > 0);
-        }
-
-        @Test
-        public void test05() {
-            List<TblColRef> groups = buildGroups();
-            List<FunctionDesc> aggregations = buildAggregations();
-
-            int count = search(groups, aggregations, null, context);
-            assertTrue(count > 0);
-        }
-    */
-    private int search(List<TblColRef> groups, List<FunctionDesc> aggregations, TupleFilter filter, StorageContext context) {
-        int count = 0;
-        ITupleIterator iterator = null;
-        try {
-            SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
-            iterator = storageEngine.search(context, sqlDigest, StorageMockUtils.newTupleInfo(groups, aggregations));
-            while (iterator.hasNext()) {
-                ITuple tuple = iterator.next();
-                System.out.println("Tuple = " + tuple);
-                count++;
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            if (iterator != null)
-                iterator.close();
-        }
-        return count;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
new file mode 100644
index 0000000..3008314
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
@@ -0,0 +1,108 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.KylinConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ */
+public class ITKafkaConsumerTest extends KafkaBaseTest {
+
+    private OneOffStreamProducer producer;
+
+    private static final int TOTAL_SEND_COUNT = 100;
+
+    private KafkaConfig kafkaConfig;
+
+    @Before
+    public void before() throws IOException {
+        producer = new OneOffStreamProducer(TOTAL_SEND_COUNT);
+        producer.start();
+        kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+    }
+
+    @After
+    public void after() {
+        producer.stop();
+    }
+
+    private void waitForProducerToStop(OneOffStreamProducer producer) {
+        while (!producer.isStopped()) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    @Ignore("since ci does not have the topic")
+    public void test() throws InterruptedException {
+        final TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
+        final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
+        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
+        for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
+            KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, 0, kafkaConfig.getBrokers(), kafkaConfig);
+            queues.add(consumer.getStreamQueue(0));
+            executorService.execute(consumer);
+        }
+        waitForProducerToStop(producer);
+
+        //wait some time to ensure consumer has fetched all data
+        Thread.sleep(5000);
+        int count = 0;
+        for (BlockingQueue<StreamMessage> queue : queues) {
+            count += queue.size();
+        }
+
+        logger.info("count of messages are " + count);
+        //since there will be historical data
+        assertTrue(count >= TOTAL_SEND_COUNT && (count % TOTAL_SEND_COUNT == 0));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
new file mode 100644
index 0000000..d8853f6
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
@@ -0,0 +1,76 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.common.KylinConfig;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ */
+public class ITKafkaRequesterTest extends KafkaBaseTest {
+
+    private static final String NON_EXISTED_TOPIC = "non_existent_topic";
+    private KafkaConfig kafkaConfig;
+
+
+    @Before
+    public void before() {
+        kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+    }
+
+    @Test
+    @Ignore("since ci does not enable kafka")
+    public void testTopicMeta() throws Exception {
+        TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
+        assertNotNull(kafkaTopicMeta);
+        assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
+        assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
+
+        KafkaConfig anotherTopicConfig = kafkaConfig.clone();
+        anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
+
+        kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(anotherTopicConfig);
+        assertTrue(kafkaTopicMeta == null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
deleted file mode 100644
index c782dae..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.KylinConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- */
-public class KafkaConsumerTest extends KafkaBaseTest {
-
-    private OneOffStreamProducer producer;
-
-    private static final int TOTAL_SEND_COUNT = 100;
-
-    private KafkaConfig kafkaConfig;
-
-    @Before
-    public void before() throws IOException {
-        producer = new OneOffStreamProducer(TOTAL_SEND_COUNT);
-        producer.start();
-        kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
-    }
-
-    @After
-    public void after() {
-        producer.stop();
-    }
-
-    private void waitForProducerToStop(OneOffStreamProducer producer) {
-        while (!producer.isStopped()) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    @Test
-    @Ignore("since ci does not have the topic")
-    public void test() throws InterruptedException {
-        final TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
-        final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
-        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
-        for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
-            KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, 0, kafkaConfig.getBrokers(), kafkaConfig);
-            queues.add(consumer.getStreamQueue(0));
-            executorService.execute(consumer);
-        }
-        waitForProducerToStop(producer);
-
-        //wait some time to ensure consumer has fetched all data
-        Thread.sleep(5000);
-        int count = 0;
-        for (BlockingQueue<StreamMessage> queue : queues) {
-            count += queue.size();
-        }
-
-        logger.info("count of messages are " + count);
-        //since there will be historical data
-        assertTrue(count >= TOTAL_SEND_COUNT && (count % TOTAL_SEND_COUNT == 0));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java
deleted file mode 100644
index 9d99825..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import org.apache.kylin.common.KylinConfig;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-/**
- */
-public class KafkaRequesterTest extends KafkaBaseTest {
-
-    private static final String NON_EXISTED_TOPIC = "non_existent_topic";
-    private KafkaConfig kafkaConfig;
-
-
-    @Before
-    public void before() {
-        kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
-    }
-
-    @AfterClass
-    public static void afterClass() {
-    }
-
-    @Test
-    @Ignore("since ci does not enable kafka")
-    public void testTopicMeta() throws Exception {
-        TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
-        assertNotNull(kafkaTopicMeta);
-        assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
-        assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
-
-        KafkaConfig anotherTopicConfig = kafkaConfig.clone();
-        anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
-
-        kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(anotherTopicConfig);
-        assertTrue(kafkaTopicMeta == null);
-    }
-}