You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/25 08:11:35 UTC
[03/11] incubator-kylin git commit: KYLIN-697 non-integration tests
all passed
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
new file mode 100644
index 0000000..492b9bf
--- /dev/null
+++ b/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.test;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.routing.RoutingRules.RealizationPriorityRule;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ */
+@RunWith(Parameterized.class)
+public class ITIIQueryTest extends ITKylinQueryTest {
+ @BeforeClass
+ public static void setUp() throws Exception {
+
+ // give II higher priority than other realizations
+ Map<RealizationType, Integer> priorities = Maps.newHashMap();
+ priorities.put(RealizationType.INVERTED_INDEX, 0);
+ priorities.put(RealizationType.CUBE, 1);
+ priorities.put(RealizationType.HYBRID, 1);
+ RealizationPriorityRule.setPriorities(priorities);
+
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ ITKylinQueryTest.tearDown();//invoke super class
+
+ Map<RealizationType, Integer> priorities = Maps.newHashMap();
+ priorities.put(RealizationType.INVERTED_INDEX, 1);
+ priorities.put(RealizationType.CUBE, 0);
+ priorities.put(RealizationType.HYBRID, 0);
+ RealizationPriorityRule.setPriorities(priorities);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> configs() {
+ return Arrays.asList(new Object[][] { { "inner" }, { "left" } });
+ }
+
+ public ITIIQueryTest(String joinType) throws Exception {
+
+ ITKylinQueryTest.clean();
+
+ ITKylinQueryTest.joinType = joinType;
+ ITKylinQueryTest.setupAll();
+
+ }
+
+ @Test
+ public void testSingleRunQuery() throws Exception {
+ super.testSingleRunQuery();
+ }
+
+ @Test
+ public void testDetailedQuery() throws Exception {
+ execAndCompQuery("src/test/resources/query/sql_ii", null, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
new file mode 100644
index 0000000..5e14471
--- /dev/null
+++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.test;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.query.enumerator.OLAPQuery;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
+import org.dbunit.database.DatabaseConnection;
+import org.dbunit.database.IDatabaseConnection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.sql.DriverManager;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertTrue;
+
+@Ignore("KylinQueryTest is contained by ITCombinationTest")
+public class ITKylinQueryTest extends KylinTestBase {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ printInfo("setUp in KylinQueryTest");
+ joinType = "left";
+
+ setupAll();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ printInfo("tearDown");
+ printInfo("Closing connection...");
+ clean();
+ }
+
+ protected static void setupAll() throws Exception {
+ //setup env
+ HBaseMetadataTestCase.staticCreateTestMetadata();
+ config = KylinConfig.getInstanceFromEnv();
+
+ //setup cube conn
+ File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config);
+ Properties props = new Properties();
+ props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "10000");
+ cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props);
+
+ //setup h2
+ h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", "");
+ // Load H2 Tables (inner join)
+ H2Database h2DB = new H2Database(h2Connection, config);
+ h2DB.loadAllTables(joinType);
+ }
+
+ protected static void clean() {
+ if (cubeConnection != null)
+ closeConnection(cubeConnection);
+ if (h2Connection != null)
+ closeConnection(h2Connection);
+
+ ObserverEnabler.forceCoprocessorUnset();
+ HBaseMetadataTestCase.staticCleanupTestMetadata();
+ }
+
+ @Ignore("this is only for debug")
+ @Test
+ public void testTempQuery() throws Exception {
+ execAndCompQuery("src/test/resources/query/temp", null, true);
+ }
+
+ @Test
+ public void testSingleRunQuery() throws Exception {
+
+ String queryFileName = "src/test/resources/query/sql/query00.sql";
+
+ File sqlFile = new File(queryFileName);
+ if (sqlFile.exists()) {
+ runSQL(sqlFile, true, true);
+ runSQL(sqlFile, true, false);
+ }
+ }
+
+ @Test
+ public void testSingleExecuteQuery() throws Exception {
+
+ String queryFileName = "src/test/resources/query/sql/query53.sql";
+
+ File sqlFile = new File(queryFileName);
+ String sql = getTextFromFile(sqlFile);
+ IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
+
+ executeQuery(kylinConn, queryFileName, sql, true);
+ }
+
+ @Ignore
+ @Test
+ public void testTableauProbing() throws Exception {
+ batchExecuteQuery("src/test/resources/query/tableau_probing");
+ }
+
+ @Test
+ public void testCommonQuery() throws Exception {
+ execAndCompQuery("src/test/resources/query/sql", null, true);
+ }
+
+ @Ignore
+ @Test
+ public void testSimpleQuery() throws Exception {
+ verifyResultRowCount("src/test/resources/query/sql_verifyCount");
+ }
+
+ @Test
+ public void testOrderByQuery() throws Exception {
+ execAndCompQuery("src/test/resources/query/sql_orderby", null, true);
+ // FIXME
+ // as of optiq 0.8, we lost metadata type with "order by" clause, e.g. sql_orderby/query01.sql
+ // thus, temporarily the "order by" clause was cross out, and the needSort is set to true
+ // execAndCompQuery("src/test/resources/query/sql_orderby", null, false);
+ }
+
+ @Test
+ public void testLookupQuery() throws Exception {
+ execAndCompQuery("src/test/resources/query/sql_lookup", null, true);
+ }
+
+ @Test
+ public void testCachedQuery() throws Exception {
+ execAndCompQuery("src/test/resources/query/sql_cache", null, true);
+ }
+
+ @Test
+ public void testDerivedColumnQuery() throws Exception {
+ execAndCompQuery("src/test/resources/query/sql_derived", null, true);
+ }
+
+ @Test
+ public void testDistinctCountQuery() throws Exception {
+ batchExecuteQuery("src/test/resources/query/sql_distinct");
+ }
+
+ @Test
+ public void testTableauQuery() throws Exception {
+ batchExecuteQuery("src/test/resources/query/sql_tableau");
+ }
+
+ @Test
+ public void testSubQuery() throws Exception {
+ execAndCompQuery("src/test/resources/query/sql_subquery", null, true);
+ }
+
+ @Test
+ public void testCaseWhen() throws Exception {
+ execAndCompQuery("src/test/resources/query/sql_casewhen", null, true);
+ }
+
+ @Ignore
+ @Test
+ public void testHiveQuery() throws Exception {
+ execAndCompQuery("src/test/resources/query/sql_hive", null, true);
+ }
+
+ @Test
+ public void testH2Query() throws Exception {
+ this.execQueryUsingH2("src/test/resources/query/sql_orderby", false);
+ }
+
+ @Test
+ public void testInvalidQuery() throws Exception {
+
+ printInfo("-------------------- Test Invalid Query --------------------");
+ String queryFolder = "src/test/resources/query/sql_invalid";
+ List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), ".sql");
+ for (File sqlFile : sqlFiles) {
+ String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+ printInfo("Testing Query " + queryName);
+ String sql = getTextFromFile(sqlFile);
+ IDatabaseConnection cubeConn = new DatabaseConnection(cubeConnection);
+ try {
+ cubeConn.createQueryTable(queryName, sql);
+ } catch (Throwable t) {
+ continue;
+ } finally {
+ cubeConn.close();
+ }
+ throw new IllegalStateException(queryName + " should be error!");
+ }
+ }
+
+ @Test
+ public void testDynamicQuery() throws Exception {
+ execAndCompDynamicQuery("src/test/resources/query/sql_dynamic", null, true);
+ }
+
+ @Ignore("simple query will be supported by ii")
+ @Test
+ public void testLimitEnabled() throws Exception {
+ runSqlFile("src/test/resources/query/sql_optimize/enable-limit01.sql");
+ assertLimitWasEnabled();
+ }
+
+ private void assertLimitWasEnabled() {
+ OLAPContext context = getFirstOLAPContext();
+ assertTrue(context.storageContext.isLimitEnabled());
+ }
+
+ private OLAPContext getFirstOLAPContext() {
+ return OLAPContext.getThreadLocalContexts().iterator().next();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
deleted file mode 100644
index cea8ee2..0000000
--- a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.test;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.query.enumerator.OLAPQuery;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.schema.OLAPSchemaFactory;
-import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
-import org.dbunit.database.DatabaseConnection;
-import org.dbunit.database.IDatabaseConnection;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.File;
-import java.sql.DriverManager;
-import java.util.List;
-import java.util.Properties;
-
-import static org.junit.Assert.assertTrue;
-
-@Ignore("KylinQueryTest is contained by CombinationTest")
-public class KylinQueryTest extends KylinTestBase {
-
- @BeforeClass
- public static void setUp() throws Exception {
- printInfo("setUp in KylinQueryTest");
- joinType = "left";
-
- setupAll();
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- printInfo("tearDown");
- printInfo("Closing connection...");
- clean();
- }
-
- protected static void setupAll() throws Exception {
- //setup env
- HBaseMetadataTestCase.staticCreateTestMetadata();
- config = KylinConfig.getInstanceFromEnv();
-
- //setup cube conn
- File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config);
- Properties props = new Properties();
- props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "10000");
- cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props);
-
- //setup h2
- h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", "");
- // Load H2 Tables (inner join)
- H2Database h2DB = new H2Database(h2Connection, config);
- h2DB.loadAllTables(joinType);
- }
-
- protected static void clean() {
- if (cubeConnection != null)
- closeConnection(cubeConnection);
- if (h2Connection != null)
- closeConnection(h2Connection);
-
- ObserverEnabler.forceCoprocessorUnset();
- HBaseMetadataTestCase.staticCleanupTestMetadata();
- }
-
- @Ignore("this is only for debug")
- @Test
- public void testTempQuery() throws Exception {
- execAndCompQuery("src/test/resources/query/temp", null, true);
- }
-
- @Test
- public void testSingleRunQuery() throws Exception {
-
- String queryFileName = "src/test/resources/query/sql/query00.sql";
-
- File sqlFile = new File(queryFileName);
- if (sqlFile.exists()) {
- runSQL(sqlFile, true, true);
- runSQL(sqlFile, true, false);
- }
- }
-
- @Test
- public void testSingleExecuteQuery() throws Exception {
-
- String queryFileName = "src/test/resources/query/sql/query53.sql";
-
- File sqlFile = new File(queryFileName);
- String sql = getTextFromFile(sqlFile);
- IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-
- executeQuery(kylinConn, queryFileName, sql, true);
- }
-
- @Ignore
- @Test
- public void testTableauProbing() throws Exception {
- batchExecuteQuery("src/test/resources/query/tableau_probing");
- }
-
- @Test
- public void testCommonQuery() throws Exception {
- execAndCompQuery("src/test/resources/query/sql", null, true);
- }
-
- @Ignore
- @Test
- public void testSimpleQuery() throws Exception {
- verifyResultRowCount("src/test/resources/query/sql_verifyCount");
- }
-
- @Test
- public void testOrderByQuery() throws Exception {
- execAndCompQuery("src/test/resources/query/sql_orderby", null, true);
- // FIXME
- // as of optiq 0.8, we lost metadata type with "order by" clause, e.g. sql_orderby/query01.sql
- // thus, temporarily the "order by" clause was cross out, and the needSort is set to true
- // execAndCompQuery("src/test/resources/query/sql_orderby", null, false);
- }
-
- @Test
- public void testLookupQuery() throws Exception {
- execAndCompQuery("src/test/resources/query/sql_lookup", null, true);
- }
-
- @Test
- public void testCachedQuery() throws Exception {
- execAndCompQuery("src/test/resources/query/sql_cache", null, true);
- }
-
- @Test
- public void testDerivedColumnQuery() throws Exception {
- execAndCompQuery("src/test/resources/query/sql_derived", null, true);
- }
-
- @Test
- public void testDistinctCountQuery() throws Exception {
- batchExecuteQuery("src/test/resources/query/sql_distinct");
- }
-
- @Test
- public void testTableauQuery() throws Exception {
- batchExecuteQuery("src/test/resources/query/sql_tableau");
- }
-
- @Test
- public void testSubQuery() throws Exception {
- execAndCompQuery("src/test/resources/query/sql_subquery", null, true);
- }
-
- @Test
- public void testCaseWhen() throws Exception {
- execAndCompQuery("src/test/resources/query/sql_casewhen", null, true);
- }
-
- @Ignore
- @Test
- public void testHiveQuery() throws Exception {
- execAndCompQuery("src/test/resources/query/sql_hive", null, true);
- }
-
- @Test
- public void testH2Query() throws Exception {
- this.execQueryUsingH2("src/test/resources/query/sql_orderby", false);
- }
-
- @Test
- public void testInvalidQuery() throws Exception {
-
- printInfo("-------------------- Test Invalid Query --------------------");
- String queryFolder = "src/test/resources/query/sql_invalid";
- List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), ".sql");
- for (File sqlFile : sqlFiles) {
- String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
- printInfo("Testing Query " + queryName);
- String sql = getTextFromFile(sqlFile);
- IDatabaseConnection cubeConn = new DatabaseConnection(cubeConnection);
- try {
- cubeConn.createQueryTable(queryName, sql);
- } catch (Throwable t) {
- continue;
- } finally {
- cubeConn.close();
- }
- throw new IllegalStateException(queryName + " should be error!");
- }
- }
-
- @Test
- public void testDynamicQuery() throws Exception {
- execAndCompDynamicQuery("src/test/resources/query/sql_dynamic", null, true);
- }
-
- @Ignore("simple query will be supported by ii")
- @Test
- public void testLimitEnabled() throws Exception {
- runSqlFile("src/test/resources/query/sql_optimize/enable-limit01.sql");
- assertLimitWasEnabled();
- }
-
- private void assertLimitWasEnabled() {
- OLAPContext context = getFirstOLAPContext();
- assertTrue(context.storageContext.isLimitEnabled());
- }
-
- private OLAPContext getFirstOLAPContext() {
- return OLAPContext.getThreadLocalContexts().iterator().next();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java b/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
new file mode 100644
index 0000000..9bdb510
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
@@ -0,0 +1,245 @@
+package org.apache.kylin.jdbc;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.webapp.WebAppContext;
+import org.junit.*;
+
+import java.io.File;
+import java.sql.*;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ */
+public class ITJDBCDriverTest extends HBaseMetadataTestCase {
+
+ private static Server server = null;
+
+ private static String previousSpringProfile = null;
+
+ private static String SPRING_PROFILE_PROPERTY = "spring.profiles.active";
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ previousSpringProfile = System.getProperty(SPRING_PROFILE_PROPERTY);
+ System.setProperty(SPRING_PROFILE_PROPERTY, "testing");
+ staticCreateTestMetadata();
+ startJetty();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ stopJetty();
+ staticCleanupTestMetadata();
+ if (previousSpringProfile == null) {
+ System.clearProperty(SPRING_PROFILE_PROPERTY);
+ } else {
+ System.setProperty(SPRING_PROFILE_PROPERTY, previousSpringProfile);
+ }
+ }
+
+ protected static void stopJetty() throws Exception {
+ if (server != null)
+ server.stop();
+
+ File workFolder = new File("work");
+ if(workFolder.isDirectory() && workFolder.exists()) {
+ FileUtils.deleteDirectory(workFolder);
+ }
+ }
+
+ protected static void startJetty() throws Exception {
+
+ server = new Server(7070);
+
+ WebAppContext context = new WebAppContext();
+ context.setDescriptor("./src/main/webapp/WEB-INF/web.xml");
+ context.setResourceBase("./src/main/webapp");
+ context.setContextPath("/kylin");
+ context.setParentLoaderPriority(true);
+
+ server.setHandler(context);
+
+ server.start();
+
+ }
+
+ protected Connection getConnection() throws Exception {
+
+ Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
+ Properties info = new Properties();
+ info.put("user", "ADMIN");
+ info.put("password", "KYLIN");
+ Connection conn = driver.connect("jdbc:kylin://localhost:7070/default", info);
+
+ return conn;
+ }
+
+ @Test
+ public void testMetadata1() throws Exception {
+
+ // check the JDBC API here: http://docs.oracle.com/javase/7/docs/api/java/sql/DatabaseMetaData.html
+ Connection conn = getConnection();
+
+ // test getSchemas();
+ List<String> schemaList = Lists.newArrayList();
+ DatabaseMetaData dbMetadata = conn.getMetaData();
+ ResultSet resultSet = dbMetadata.getSchemas();
+ while (resultSet.next()) {
+ String schema = resultSet.getString("TABLE_SCHEM");
+ String catalog = resultSet.getString("TABLE_CATALOG");
+
+ System.out.println("Get schema: schema=" + schema + ", catalog=" + catalog);
+ schemaList.add(schema);
+
+ }
+
+ resultSet.close();
+ Assert.assertTrue(schemaList.contains("DEFAULT"));
+ Assert.assertTrue(schemaList.contains("EDW"));
+
+ // test getCatalogs();
+ resultSet = dbMetadata.getCatalogs();
+
+ List<String> catalogList = Lists.newArrayList();
+ while (resultSet.next()) {
+ String catalog = resultSet.getString("TABLE_CAT");
+
+ System.out.println("Get catalog: catalog=" + catalog);
+ catalogList.add(catalog);
+
+ }
+ Assert.assertTrue(catalogList.size() > 0 && catalogList.contains("defaultCatalog"));
+
+ /** //Disable the test on getTableTypes() as it is not ready
+ resultSet = dbMetadata.getTableTypes();
+
+ List<String> tableTypes = Lists.newArrayList();
+ while (resultSet.next()) {
+ String type = resultSet.getString("TABLE_TYPE");
+
+ System.out.println("Get table type: type=" + type);
+ tableTypes.add(type);
+
+ }
+
+ Assert.assertTrue(tableTypes.size() > 0 && tableTypes.contains("TABLE"));
+ resultSet.close();
+
+ **/
+ conn.close();
+ }
+
+ @Test
+ public void testMetadata2() throws Exception {
+ Connection conn = getConnection();
+
+ List<String> tableList = Lists.newArrayList();
+ DatabaseMetaData dbMetadata = conn.getMetaData();
+ ResultSet resultSet = dbMetadata.getTables(null, "%", "%", new String[]{"TABLE"});
+ while (resultSet.next()) {
+ String schema = resultSet.getString("TABLE_SCHEM");
+ String name = resultSet.getString("TABLE_NAME");
+
+ System.out.println("Get table: schema=" + schema + ", name=" + name);
+ tableList.add(schema + "." + name);
+
+ }
+
+ resultSet.close();
+ Assert.assertTrue(tableList.contains("DEFAULT.TEST_KYLIN_FACT"));
+
+ resultSet = dbMetadata.getColumns(null, "%", "TEST_KYLIN_FACT", "%");
+
+ List<String> columns = Lists.newArrayList();
+ while (resultSet.next()) {
+ String name = resultSet.getString("COLUMN_NAME");
+ String type = resultSet.getString("TYPE_NAME");
+
+ System.out.println("Get column: name=" + name + ", data_type=" + type);
+ columns.add(name);
+
+ }
+
+ Assert.assertTrue(columns.size() > 0 && columns.contains("CAL_DT"));
+ resultSet.close();
+ conn.close();
+ }
+
+ @Test
+ public void testSimpleStatement() throws Exception {
+ Connection conn = getConnection();
+ Statement statement = conn.createStatement();
+
+ statement.execute("select count(*) from test_kylin_fact");
+
+ ResultSet rs = statement.getResultSet();
+
+ Assert.assertTrue(rs.next());
+ int result = rs.getInt(1);
+
+ Assert.assertTrue(result > 0);
+
+ rs.close();
+ statement.close();
+ conn.close();
+
+ }
+
+
+ @Test
+ public void testPreparedStatement() throws Exception {
+ Connection conn = getConnection();
+
+ PreparedStatement statement = conn.prepareStatement("select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact " +
+ "where LSTG_FORMAT_NAME = ? group by LSTG_FORMAT_NAME");
+
+ statement.setString(1, "FP-GTC");
+
+ ResultSet rs = statement.executeQuery();
+
+ Assert.assertTrue(rs.next());
+
+ String format_name = rs.getString(1);
+
+ Assert.assertTrue("FP-GTC".equals(format_name));
+
+ rs.close();
+ statement.close();
+ conn.close();
+
+ }
+
+ @Test
+ public void testResultSet() throws Exception {
+ String sql = "select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact \n" +
+ " group by LSTG_FORMAT_NAME ";
+
+ Connection conn = getConnection();
+ Statement statement = conn.createStatement();
+
+ statement.execute(sql);
+
+ ResultSet rs = statement.getResultSet();
+
+ int count = 0;
+ while (rs.next()) {
+ count++;
+ String lstg = rs.getString(1);
+ double gmv = rs.getDouble(2);
+ int trans_count = rs.getInt(3);
+
+ System.out.println("Get a line: LSTG_FORMAT_NAME=" + lstg + ", GMV=" + gmv + ", TRANS_CNT=" + trans_count);
+ }
+
+ Assert.assertTrue(count > 0);
+ statement.close();
+ rs.close();
+ conn.close();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/server/src/test/java/org/apache/kylin/jdbc/JDBCDriverTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/jdbc/JDBCDriverTest.java b/server/src/test/java/org/apache/kylin/jdbc/JDBCDriverTest.java
deleted file mode 100644
index 2eff08f..0000000
--- a/server/src/test/java/org/apache/kylin/jdbc/JDBCDriverTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-package org.apache.kylin.jdbc;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.FileUtils;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.webapp.WebAppContext;
-import org.junit.*;
-
-import java.io.File;
-import java.sql.*;
-import java.util.List;
-import java.util.Properties;
-
-/**
- */
-public class JDBCDriverTest extends HBaseMetadataTestCase {
-
- private static Server server = null;
-
- private static String previousSpringProfile = null;
-
- private static String SPRING_PROFILE_PROPERTY = "spring.profiles.active";
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- previousSpringProfile = System.getProperty(SPRING_PROFILE_PROPERTY);
- System.setProperty(SPRING_PROFILE_PROPERTY, "testing");
- staticCreateTestMetadata();
- startJetty();
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- stopJetty();
- staticCleanupTestMetadata();
- if (previousSpringProfile == null) {
- System.clearProperty(SPRING_PROFILE_PROPERTY);
- } else {
- System.setProperty(SPRING_PROFILE_PROPERTY, previousSpringProfile);
- }
- }
-
- protected static void stopJetty() throws Exception {
- if (server != null)
- server.stop();
-
- File workFolder = new File("work");
- if(workFolder.isDirectory() && workFolder.exists()) {
- FileUtils.deleteDirectory(workFolder);
- }
- }
-
- protected static void startJetty() throws Exception {
-
- server = new Server(7070);
-
- WebAppContext context = new WebAppContext();
- context.setDescriptor("./src/main/webapp/WEB-INF/web.xml");
- context.setResourceBase("./src/main/webapp");
- context.setContextPath("/kylin");
- context.setParentLoaderPriority(true);
-
- server.setHandler(context);
-
- server.start();
-
- }
-
- protected Connection getConnection() throws Exception {
-
- Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
- Properties info = new Properties();
- info.put("user", "ADMIN");
- info.put("password", "KYLIN");
- Connection conn = driver.connect("jdbc:kylin://localhost:7070/default", info);
-
- return conn;
- }
-
- @Test
- public void testMetadata1() throws Exception {
-
- // check the JDBC API here: http://docs.oracle.com/javase/7/docs/api/java/sql/DatabaseMetaData.html
- Connection conn = getConnection();
-
- // test getSchemas();
- List<String> schemaList = Lists.newArrayList();
- DatabaseMetaData dbMetadata = conn.getMetaData();
- ResultSet resultSet = dbMetadata.getSchemas();
- while (resultSet.next()) {
- String schema = resultSet.getString("TABLE_SCHEM");
- String catalog = resultSet.getString("TABLE_CATALOG");
-
- System.out.println("Get schema: schema=" + schema + ", catalog=" + catalog);
- schemaList.add(schema);
-
- }
-
- resultSet.close();
- Assert.assertTrue(schemaList.contains("DEFAULT"));
- Assert.assertTrue(schemaList.contains("EDW"));
-
- // test getCatalogs();
- resultSet = dbMetadata.getCatalogs();
-
- List<String> catalogList = Lists.newArrayList();
- while (resultSet.next()) {
- String catalog = resultSet.getString("TABLE_CAT");
-
- System.out.println("Get catalog: catalog=" + catalog);
- catalogList.add(catalog);
-
- }
- Assert.assertTrue(catalogList.size() > 0 && catalogList.contains("defaultCatalog"));
-
- /** //Disable the test on getTableTypes() as it is not ready
- resultSet = dbMetadata.getTableTypes();
-
- List<String> tableTypes = Lists.newArrayList();
- while (resultSet.next()) {
- String type = resultSet.getString("TABLE_TYPE");
-
- System.out.println("Get table type: type=" + type);
- tableTypes.add(type);
-
- }
-
- Assert.assertTrue(tableTypes.size() > 0 && tableTypes.contains("TABLE"));
- resultSet.close();
-
- **/
- conn.close();
- }
-
- @Test
- public void testMetadata2() throws Exception {
- Connection conn = getConnection();
-
- List<String> tableList = Lists.newArrayList();
- DatabaseMetaData dbMetadata = conn.getMetaData();
- ResultSet resultSet = dbMetadata.getTables(null, "%", "%", new String[]{"TABLE"});
- while (resultSet.next()) {
- String schema = resultSet.getString("TABLE_SCHEM");
- String name = resultSet.getString("TABLE_NAME");
-
- System.out.println("Get table: schema=" + schema + ", name=" + name);
- tableList.add(schema + "." + name);
-
- }
-
- resultSet.close();
- Assert.assertTrue(tableList.contains("DEFAULT.TEST_KYLIN_FACT"));
-
- resultSet = dbMetadata.getColumns(null, "%", "TEST_KYLIN_FACT", "%");
-
- List<String> columns = Lists.newArrayList();
- while (resultSet.next()) {
- String name = resultSet.getString("COLUMN_NAME");
- String type = resultSet.getString("TYPE_NAME");
-
- System.out.println("Get column: name=" + name + ", data_type=" + type);
- columns.add(name);
-
- }
-
- Assert.assertTrue(columns.size() > 0 && columns.contains("CAL_DT"));
- resultSet.close();
- conn.close();
- }
-
- @Test
- public void testSimpleStatement() throws Exception {
- Connection conn = getConnection();
- Statement statement = conn.createStatement();
-
- statement.execute("select count(*) from test_kylin_fact");
-
- ResultSet rs = statement.getResultSet();
-
- Assert.assertTrue(rs.next());
- int result = rs.getInt(1);
-
- Assert.assertTrue(result > 0);
-
- rs.close();
- statement.close();
- conn.close();
-
- }
-
-
- @Test
- public void testPreparedStatement() throws Exception {
- Connection conn = getConnection();
-
- PreparedStatement statement = conn.prepareStatement("select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact " +
- "where LSTG_FORMAT_NAME = ? group by LSTG_FORMAT_NAME");
-
- statement.setString(1, "FP-GTC");
-
- ResultSet rs = statement.executeQuery();
-
- Assert.assertTrue(rs.next());
-
- String format_name = rs.getString(1);
-
- Assert.assertTrue("FP-GTC".equals(format_name));
-
- rs.close();
- statement.close();
- conn.close();
-
- }
-
- @Test
- public void testResultSet() throws Exception {
- String sql = "select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact \n" +
- " group by LSTG_FORMAT_NAME ";
-
- Connection conn = getConnection();
- Statement statement = conn.createStatement();
-
- statement.execute(sql);
-
- ResultSet rs = statement.getResultSet();
-
- int count = 0;
- while (rs.next()) {
- count++;
- String lstg = rs.getString(1);
- double gmv = rs.getDouble(2);
- int trans_count = rs.getInt(3);
-
- System.out.println("Get a line: LSTG_FORMAT_NAME=" + lstg + ", GMV=" + gmv + ", TRANS_CNT=" + trans_count);
- }
-
- Assert.assertTrue(count > 0);
- statement.close();
- rs.close();
- conn.close();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/server/src/test/java/org/apache/kylin/rest/controller/ITTableControllerTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/ITTableControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/ITTableControllerTest.java
new file mode 100644
index 0000000..75defbf
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/controller/ITTableControllerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.controller;
+
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.rest.service.CubeService;
+import org.apache.kylin.rest.service.ServiceTestBase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author shaoshi
+ */
+public class ITTableControllerTest extends ServiceTestBase {
+
+ private TableController tableController;
+ private CubeDescController cubeDescController;
+
+ @Autowired
+ CubeService cubeService;
+
+ @Before
+ public void setup() throws Exception {
+ super.setup();
+
+ tableController = new TableController();
+ tableController.setCubeService(cubeService);
+ }
+
+ @Test
+ public void testBasics() throws IOException {
+ List<TableDesc> tables = tableController.getHiveTables(true, "default");
+
+ Assert.assertTrue(tables != null && tables.size() > 0);
+
+ TableDesc factTable = null;
+ for (TableDesc t : tables) {
+ if (t.getName().equalsIgnoreCase("test_kylin_fact")) {
+ factTable = t;
+ break;
+ }
+ }
+
+ Assert.assertNotNull(factTable);
+
+
+ Map<String, String[]> loadResult = tableController.loadHiveTable("test_kylin_fact,TEST_CATEGORY_GROUPINGS", "default");
+ Assert.assertNotNull(loadResult);
+
+ Assert.assertTrue(loadResult.get("result.loaded").length ==2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/server/src/test/java/org/apache/kylin/rest/controller/TableControllerTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/TableControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/TableControllerTest.java
deleted file mode 100644
index 4437abe..0000000
--- a/server/src/test/java/org/apache/kylin/rest/controller/TableControllerTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.rest.controller;
-
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.rest.service.CubeService;
-import org.apache.kylin.rest.service.ServiceTestBase;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author shaoshi
- */
-public class TableControllerTest extends ServiceTestBase {
-
- private TableController tableController;
- private CubeDescController cubeDescController;
-
- @Autowired
- CubeService cubeService;
-
- @Before
- public void setup() throws Exception {
- super.setup();
-
- tableController = new TableController();
- tableController.setCubeService(cubeService);
- }
-
- @Test
- public void testBasics() throws IOException {
- List<TableDesc> tables = tableController.getHiveTables(true, "default");
-
- Assert.assertTrue(tables != null && tables.size() > 0);
-
- TableDesc factTable = null;
- for (TableDesc t : tables) {
- if (t.getName().equalsIgnoreCase("test_kylin_fact")) {
- factTable = t;
- break;
- }
- }
-
- Assert.assertNotNull(factTable);
-
-
- Map<String, String[]> loadResult = tableController.loadHiveTable("test_kylin_fact,TEST_CATEGORY_GROUPINGS", "default");
- Assert.assertNotNull(loadResult);
-
- Assert.assertTrue(loadResult.get("result.loaded").length ==2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 559ec89..9f0598d 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -129,6 +129,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
@After
public void after() throws Exception {
+ cleanupTestMetadata();
}
private void waitForCounterAndClear(long count) {
@@ -145,16 +146,6 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
}
}
- @BeforeClass
- public static void startServer() throws Exception {
-
- }
-
- @AfterClass
- public static void stopServer() throws Exception {
-
- }
-
private static CubeManager getCubeManager(KylinConfig config) throws Exception {
return CubeManager.getInstance(config);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/storage/src/test/java/org/apache/kylin/storage/hbase/ITInvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/ITInvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/ITInvertedIndexHBaseTest.java
new file mode 100644
index 0000000..c92921b
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/ITInvertedIndexHBaseTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * @author yangli9
+ */
+public class ITInvertedIndexHBaseTest extends HBaseMetadataTestCase {
+
+ IIInstance ii;
+ IISegment seg;
+ HConnection hconn;
+
+ TableRecordInfo info;
+
+ @Before
+ public void setup() throws Exception {
+ this.createTestMetadata();
+
+ this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ this.seg = ii.getFirstSegment();
+
+ String hbaseUrl = KylinConfig.getInstanceFromEnv().getStorageUrl();
+ Configuration hconf = HadoopUtil.newHBaseConfiguration(hbaseUrl);
+ hconn = HConnectionManager.createConnection(hconf);
+
+ this.info = new TableRecordInfo(seg);
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testLoad() throws Exception {
+
+ String tableName = seg.getStorageLocationIdentifier();
+ IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+
+ List<Slice> slices = Lists.newArrayList();
+ HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES);
+ try {
+ for (Slice slice : codec.decodeKeyValue(kvIterator)) {
+ slices.add(slice);
+ }
+ } finally {
+ kvIterator.close();
+ }
+
+ List<TableRecord> records = iterateRecords(slices);
+ //dump(records);
+ System.out.println(records.size() + " records");
+ }
+
+ private List<TableRecord> iterateRecords(List<Slice> slices) {
+ List<TableRecord> records = Lists.newArrayList();
+ for (Slice slice : slices) {
+ for (RawTableRecord rec : slice) {
+ records.add(new TableRecord((RawTableRecord) rec.clone(), info));
+ }
+ }
+ return records;
+ }
+
+ private void dump(Iterable<TableRecord> records) {
+ for (TableRecord rec : records) {
+ byte[] x = rec.getBytes();
+ String y = BytesUtil.toReadableText(x);
+ System.out.println(y);
+ System.out.println();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
deleted file mode 100644
index fd24487..0000000
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
-
- IIInstance ii;
- IISegment seg;
- HConnection hconn;
-
- TableRecordInfo info;
-
- @Before
- public void setup() throws Exception {
- this.createTestMetadata();
-
- this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
- this.seg = ii.getFirstSegment();
-
- String hbaseUrl = KylinConfig.getInstanceFromEnv().getStorageUrl();
- Configuration hconf = HadoopUtil.newHBaseConfiguration(hbaseUrl);
- hconn = HConnectionManager.createConnection(hconf);
-
- this.info = new TableRecordInfo(seg);
- }
-
- @After
- public void after() throws Exception {
- this.cleanupTestMetadata();
- }
-
- @Test
- public void testLoad() throws Exception {
-
- String tableName = seg.getStorageLocationIdentifier();
- IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
-
- List<Slice> slices = Lists.newArrayList();
- HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES);
- try {
- for (Slice slice : codec.decodeKeyValue(kvIterator)) {
- slices.add(slice);
- }
- } finally {
- kvIterator.close();
- }
-
- List<TableRecord> records = iterateRecords(slices);
- //dump(records);
- System.out.println(records.size() + " records");
- }
-
- private List<TableRecord> iterateRecords(List<Slice> slices) {
- List<TableRecord> records = Lists.newArrayList();
- for (Slice slice : slices) {
- for (RawTableRecord rec : slice) {
- records.add(new TableRecord((RawTableRecord) rec.clone(), info));
- }
- }
- return records;
- }
-
- private void dump(Iterable<TableRecord> records) {
- for (TableRecord rec : records) {
- byte[] x = rec.getBytes();
- String y = BytesUtil.toReadableText(x);
- System.out.println(y);
- System.out.println();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
new file mode 100644
index 0000000..1e32ef5
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.test;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.StorageEngineFactory;
+import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
+import org.junit.*;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class ITStorageTest extends HBaseMetadataTestCase {
+
+ private IStorageEngine storageEngine;
+ private CubeInstance cube;
+ private StorageContext context;
+
+ @BeforeClass
+ public static void setupResource() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownResource() {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+
+ CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
+ cube = cubeMgr.getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_EMPTY");
+ Assert.assertNotNull(cube);
+ storageEngine = StorageEngineFactory.getStorageEngine(cube);
+ String url = KylinConfig.getInstanceFromEnv().getStorageUrl();
+ context = new StorageContext();
+ context.setConnUrl(url);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test(expected = ScanOutOfLimitException.class)
+ @Ignore
+ public void testScanOutOfLimit() {
+ context.setThreshold(1);
+ List<TblColRef> groups = StorageMockUtils.buildGroups();
+ List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
+
+ search(groups, aggregations, null, context);
+ }
+
+ @Test
+ public void test01() {
+ List<TblColRef> groups = StorageMockUtils.buildGroups();
+ List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
+ TupleFilter filter = StorageMockUtils.buildFilter1(groups.get(0));
+
+ int count = search(groups, aggregations, filter, context);
+ assertTrue(count > 0);
+ }
+
+ /*
+ @Test
+ public void test02() {
+ List<TblColRef> groups = buildGroups();
+ List<FunctionDesc> aggregations = buildAggregations();
+ TupleFilter filter = buildFilter2(groups.get(1));
+
+ int count = search(groups, aggregations, filter, context);
+ assertTrue(count > 0);
+ }
+
+ @Test
+ public void test03() {
+ List<TblColRef> groups = buildGroups();
+ List<FunctionDesc> aggregations = buildAggregations();
+ TupleFilter filter = buildAndFilter(groups);
+
+ int count = search(groups, aggregations, filter, context);
+ assertTrue(count > 0);
+ }
+
+ @Test
+ public void test04() {
+ List<TblColRef> groups = buildGroups();
+ List<FunctionDesc> aggregations = buildAggregations();
+ TupleFilter filter = buildOrFilter(groups);
+
+ int count = search(groups, aggregations, filter, context);
+ assertTrue(count > 0);
+ }
+
+ @Test
+ public void test05() {
+ List<TblColRef> groups = buildGroups();
+ List<FunctionDesc> aggregations = buildAggregations();
+
+ int count = search(groups, aggregations, null, context);
+ assertTrue(count > 0);
+ }
+ */
+ private int search(List<TblColRef> groups, List<FunctionDesc> aggregations, TupleFilter filter, StorageContext context) {
+ int count = 0;
+ ITupleIterator iterator = null;
+ try {
+ SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
+ iterator = storageEngine.search(context, sqlDigest, StorageMockUtils.newTupleInfo(groups, aggregations));
+ while (iterator.hasNext()) {
+ ITuple tuple = iterator.next();
+ System.out.println("Tuple = " + tuple);
+ count++;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (iterator != null)
+ iterator.close();
+ }
+ return count;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java b/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java
deleted file mode 100644
index f9a6212..0000000
--- a/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.test;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.IStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.StorageEngineFactory;
-import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
-import org.junit.*;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
-public class StorageTest extends HBaseMetadataTestCase {
-
- private IStorageEngine storageEngine;
- private CubeInstance cube;
- private StorageContext context;
-
- @BeforeClass
- public static void setupResource() throws Exception {
- }
-
- @AfterClass
- public static void tearDownResource() {
- }
-
- @Before
- public void setUp() throws Exception {
- this.createTestMetadata();
-
- CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
- cube = cubeMgr.getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_EMPTY");
- Assert.assertNotNull(cube);
- storageEngine = StorageEngineFactory.getStorageEngine(cube);
- String url = KylinConfig.getInstanceFromEnv().getStorageUrl();
- context = new StorageContext();
- context.setConnUrl(url);
- }
-
- @After
- public void tearDown() throws Exception {
- this.cleanupTestMetadata();
- }
-
- @Test(expected = ScanOutOfLimitException.class)
- @Ignore
- public void testScanOutOfLimit() {
- context.setThreshold(1);
- List<TblColRef> groups = StorageMockUtils.buildGroups();
- List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
-
- search(groups, aggregations, null, context);
- }
-
- @Test
- public void test01() {
- List<TblColRef> groups = StorageMockUtils.buildGroups();
- List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
- TupleFilter filter = StorageMockUtils.buildFilter1(groups.get(0));
-
- int count = search(groups, aggregations, filter, context);
- assertTrue(count > 0);
- }
-
- /*
- @Test
- public void test02() {
- List<TblColRef> groups = buildGroups();
- List<FunctionDesc> aggregations = buildAggregations();
- TupleFilter filter = buildFilter2(groups.get(1));
-
- int count = search(groups, aggregations, filter, context);
- assertTrue(count > 0);
- }
-
- @Test
- public void test03() {
- List<TblColRef> groups = buildGroups();
- List<FunctionDesc> aggregations = buildAggregations();
- TupleFilter filter = buildAndFilter(groups);
-
- int count = search(groups, aggregations, filter, context);
- assertTrue(count > 0);
- }
-
- @Test
- public void test04() {
- List<TblColRef> groups = buildGroups();
- List<FunctionDesc> aggregations = buildAggregations();
- TupleFilter filter = buildOrFilter(groups);
-
- int count = search(groups, aggregations, filter, context);
- assertTrue(count > 0);
- }
-
- @Test
- public void test05() {
- List<TblColRef> groups = buildGroups();
- List<FunctionDesc> aggregations = buildAggregations();
-
- int count = search(groups, aggregations, null, context);
- assertTrue(count > 0);
- }
- */
- private int search(List<TblColRef> groups, List<FunctionDesc> aggregations, TupleFilter filter, StorageContext context) {
- int count = 0;
- ITupleIterator iterator = null;
- try {
- SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
- iterator = storageEngine.search(context, sqlDigest, StorageMockUtils.newTupleInfo(groups, aggregations));
- while (iterator.hasNext()) {
- ITuple tuple = iterator.next();
- System.out.println("Tuple = " + tuple);
- count++;
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (iterator != null)
- iterator.close();
- }
- return count;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
new file mode 100644
index 0000000..3008314
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
@@ -0,0 +1,108 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.KylinConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ */
+public class ITKafkaConsumerTest extends KafkaBaseTest {
+
+ private OneOffStreamProducer producer;
+
+ private static final int TOTAL_SEND_COUNT = 100;
+
+ private KafkaConfig kafkaConfig;
+
+ @Before
+ public void before() throws IOException {
+ producer = new OneOffStreamProducer(TOTAL_SEND_COUNT);
+ producer.start();
+ kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+ }
+
+ @After
+ public void after() {
+ producer.stop();
+ }
+
+ private void waitForProducerToStop(OneOffStreamProducer producer) {
+ while (!producer.isStopped()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ @Ignore("since ci does not have the topic")
+ public void test() throws InterruptedException {
+ final TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
+ final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
+ List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
+ for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
+ KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, 0, kafkaConfig.getBrokers(), kafkaConfig);
+ queues.add(consumer.getStreamQueue(0));
+ executorService.execute(consumer);
+ }
+ waitForProducerToStop(producer);
+
+ //wait some time to ensure consumer has fetched all data
+ Thread.sleep(5000);
+ int count = 0;
+ for (BlockingQueue<StreamMessage> queue : queues) {
+ count += queue.size();
+ }
+
+ logger.info("count of messages are " + count);
+ //since there will be historical data
+ assertTrue(count >= TOTAL_SEND_COUNT && (count % TOTAL_SEND_COUNT == 0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
new file mode 100644
index 0000000..d8853f6
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaRequesterTest.java
@@ -0,0 +1,76 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.common.KylinConfig;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ */
+public class ITKafkaRequesterTest extends KafkaBaseTest {
+
+ private static final String NON_EXISTED_TOPIC = "non_existent_topic";
+ private KafkaConfig kafkaConfig;
+
+
+ @Before
+ public void before() {
+ kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ }
+
+ @Test
+ @Ignore("since ci does not enable kafka")
+ public void testTopicMeta() throws Exception {
+ TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
+ assertNotNull(kafkaTopicMeta);
+ assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
+ assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
+
+ KafkaConfig anotherTopicConfig = kafkaConfig.clone();
+ anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
+
+ kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(anotherTopicConfig);
+ assertTrue(kafkaTopicMeta == null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
deleted file mode 100644
index c782dae..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.KylinConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- */
-public class KafkaConsumerTest extends KafkaBaseTest {
-
- private OneOffStreamProducer producer;
-
- private static final int TOTAL_SEND_COUNT = 100;
-
- private KafkaConfig kafkaConfig;
-
- @Before
- public void before() throws IOException {
- producer = new OneOffStreamProducer(TOTAL_SEND_COUNT);
- producer.start();
- kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
- }
-
- @After
- public void after() {
- producer.stop();
- }
-
- private void waitForProducerToStop(OneOffStreamProducer producer) {
- while (!producer.isStopped()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Test
- @Ignore("since ci does not have the topic")
- public void test() throws InterruptedException {
- final TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
- final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
- List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
- for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
- KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, 0, kafkaConfig.getBrokers(), kafkaConfig);
- queues.add(consumer.getStreamQueue(0));
- executorService.execute(consumer);
- }
- waitForProducerToStop(producer);
-
- //wait some time to ensure consumer has fetched all data
- Thread.sleep(5000);
- int count = 0;
- for (BlockingQueue<StreamMessage> queue : queues) {
- count += queue.size();
- }
-
- logger.info("count of messages are " + count);
- //since there will be historical data
- assertTrue(count >= TOTAL_SEND_COUNT && (count % TOTAL_SEND_COUNT == 0));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/42f124fb/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java
deleted file mode 100644
index 9d99825..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import org.apache.kylin.common.KylinConfig;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-/**
- */
-public class KafkaRequesterTest extends KafkaBaseTest {
-
- private static final String NON_EXISTED_TOPIC = "non_existent_topic";
- private KafkaConfig kafkaConfig;
-
-
- @Before
- public void before() {
- kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
- }
-
- @AfterClass
- public static void afterClass() {
- }
-
- @Test
- @Ignore("since ci does not enable kafka")
- public void testTopicMeta() throws Exception {
- TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaConfig);
- assertNotNull(kafkaTopicMeta);
- assertEquals(2, kafkaTopicMeta.getPartitionIds().size());
- assertEquals(kafkaConfig.getTopic(), kafkaTopicMeta.getName());
-
- KafkaConfig anotherTopicConfig = kafkaConfig.clone();
- anotherTopicConfig.setTopic(NON_EXISTED_TOPIC);
-
- kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(anotherTopicConfig);
- assertTrue(kafkaTopicMeta == null);
- }
-}