You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/19 02:27:53 UTC
[04/12] incubator-kylin git commit: KYLIN-1010 Decompose project job
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
deleted file mode 100644
index 8c92f87..0000000
--- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.ResourceTool;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.job.dataGen.FactTableGenerator;
-import org.apache.kylin.job.streaming.KafkaDataLoader;
-import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.hive.HiveClient;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.TimedJsonStreamParser;
-import org.apache.maven.model.Model;
-import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-public class DeployUtil {
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
-
- public static void initCliWorkDir() throws IOException {
- execCliCommand("rm -rf " + getHadoopCliWorkingDir());
- execCliCommand("mkdir -p " + config().getKylinJobLogDir());
- }
-
- public static void deployMetadata() throws IOException {
- // install metadata to hbase
- ResourceTool.reset(config());
- ResourceTool.copy(KylinConfig.createInstanceFromUri(AbstractKylinTestCase.LOCALMETA_TEST_DATA), config());
-
- // update cube desc signature.
- for (CubeInstance cube : CubeManager.getInstance(config()).listAllCubes()) {
- cube.getDescriptor().setSignature(cube.getDescriptor().calculateSignature());
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- CubeManager.getInstance(config()).updateCube(cubeBuilder);
- }
- }
-
- public static void overrideJobJarLocations() {
- File jobJar = getJobJarFile();
- File coprocessorJar = getCoprocessorJarFile();
-
- config().overrideMRJobJarPath(jobJar.getAbsolutePath());
- config().overrideCoprocessorLocalJar(coprocessorJar.getAbsolutePath());
- config().overrideSparkJobJarPath(getSparkJobJarFile().getAbsolutePath());
- }
-
- private static String getPomVersion() {
- try {
- MavenXpp3Reader pomReader = new MavenXpp3Reader();
- Model model = pomReader.read(new FileReader("../pom.xml"));
- return model.getVersion();
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- private static File getJobJarFile() {
- return new File("../job/target", "kylin-job-" + getPomVersion() + "-job.jar");
- }
-
- private static File getCoprocessorJarFile() {
- return new File("../storage-hbase/target", "kylin-storage-hbase-" + getPomVersion() + "-coprocessor.jar");
- }
-
- private static File getSparkJobJarFile() {
- return new File("../engine-spark/target", "kylin-engine-spark-" + getPomVersion() + "-job.jar");
- }
-
- private static void execCliCommand(String cmd) throws IOException {
- config().getCliCommandExecutor().execute(cmd);
- }
-
- private static String getHadoopCliWorkingDir() {
- return config().getCliWorkingDir();
- }
-
- private static KylinConfig config() {
- return KylinConfig.getInstanceFromEnv();
- }
-
- // ============================================================================
-
- static final String TABLE_CAL_DT = "edw.test_cal_dt";
- static final String TABLE_CATEGORY_GROUPINGS = "default.test_category_groupings";
- static final String TABLE_KYLIN_FACT = "default.test_kylin_fact";
- static final String TABLE_SELLER_TYPE_DIM = "edw.test_seller_type_dim";
- static final String TABLE_SITES = "edw.test_sites";
-
- static final String[] TABLE_NAMES = new String[] { TABLE_CAL_DT, TABLE_CATEGORY_GROUPINGS, TABLE_KYLIN_FACT, TABLE_SELLER_TYPE_DIM, TABLE_SITES };
-
- public static void prepareTestDataForNormalCubes(String cubeName) throws Exception {
-
- String factTableName = TABLE_KYLIN_FACT.toUpperCase();
- String content = null;
-
- boolean buildCubeUsingProvidedData = Boolean.parseBoolean(System.getProperty("buildCubeUsingProvidedData"));
- if (!buildCubeUsingProvidedData) {
- System.out.println("build cube with random dataset");
- // data is generated according to cube descriptor and saved in resource store
- content = FactTableGenerator.generate(cubeName, "10000", "0.6", null);
- assert content != null;
- overrideFactTableData(content, factTableName);
- } else {
- System.out.println("build normal cubes with provided dataset");
- }
-
- deployHiveTables();
- }
-
- public static void prepareTestDataForStreamingCube(long startTime, long endTime, StreamingConfig streamingConfig) throws IOException {
- CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(streamingConfig.getCubeName());
- List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
- TableDesc tableDesc = cubeInstance.getFactTableDesc();
-
- //load into kafka
- KafkaDataLoader.loadIntoKafka(streamingConfig, data);
- logger.info("Write {} messages into topic {}", data.size(), streamingConfig.getTopic());
-
- //csv data for H2 use
- List<TblColRef> tableColumns = Lists.newArrayList();
- for (ColumnDesc columnDesc : tableDesc.getColumns()) {
- tableColumns.add(new TblColRef(columnDesc));
- }
- TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
- StringBuilder sb = new StringBuilder();
- for (String json : data) {
- List<String> rowColumns = timedJsonStreamParser.parse(new StreamMessage(0, json.getBytes())).getStreamMessage();
- sb.append(StringUtils.join(rowColumns, ","));
- sb.append(System.getProperty("line.separator"));
- }
- overrideFactTableData(sb.toString(), cubeInstance.getFactTable());
- }
-
- public static void overrideFactTableData(String factTableContent, String factTableName) throws IOException {
- // Write to resource store
- ResourceStore store = ResourceStore.getStore(config());
-
- InputStream in = new ByteArrayInputStream(factTableContent.getBytes("UTF-8"));
- String factTablePath = "/data/" + factTableName + ".csv";
- store.deleteResource(factTablePath);
- store.putResource(factTablePath, in, System.currentTimeMillis());
- in.close();
- }
-
- private static void deployHiveTables() throws Exception {
-
- MetadataManager metaMgr = MetadataManager.getInstance(config());
-
- // scp data files, use the data from hbase, instead of local files
- File temp = File.createTempFile("temp", ".csv");
- temp.createNewFile();
- for (String tablename : TABLE_NAMES) {
- tablename = tablename.toUpperCase();
-
- File localBufferFile = new File(temp.getParent() + "/" + tablename + ".csv");
- localBufferFile.createNewFile();
-
- InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv");
- FileOutputStream localFileStream = new FileOutputStream(localBufferFile);
- IOUtils.copy(hbaseDataStream, localFileStream);
-
- hbaseDataStream.close();
- localFileStream.close();
-
- localBufferFile.deleteOnExit();
- }
- String tableFileDir = temp.getParent();
- temp.delete();
-
- HiveClient hiveClient = new HiveClient();
-
- // create hive tables
- hiveClient.executeHQL("CREATE DATABASE IF NOT EXISTS EDW");
- hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CAL_DT.toUpperCase())));
- hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CATEGORY_GROUPINGS.toUpperCase())));
- hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_KYLIN_FACT.toUpperCase())));
- hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_SELLER_TYPE_DIM.toUpperCase())));
- hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_SITES.toUpperCase())));
-
- // load data to hive tables
- // LOAD DATA LOCAL INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
- hiveClient.executeHQL(generateLoadDataHql(TABLE_CAL_DT, tableFileDir));
- hiveClient.executeHQL(generateLoadDataHql(TABLE_CATEGORY_GROUPINGS, tableFileDir));
- hiveClient.executeHQL(generateLoadDataHql(TABLE_KYLIN_FACT, tableFileDir));
- hiveClient.executeHQL(generateLoadDataHql(TABLE_SELLER_TYPE_DIM, tableFileDir));
- hiveClient.executeHQL(generateLoadDataHql(TABLE_SITES, tableFileDir));
- }
-
- private static String generateLoadDataHql(String tableName, String tableFileDir) {
- return "LOAD DATA LOCAL INPATH '" + tableFileDir + "/" + tableName.toUpperCase() + ".csv' OVERWRITE INTO TABLE " + tableName.toUpperCase();
- }
-
- private static String[] generateCreateTableHql(TableDesc tableDesc) {
-
- String dropsql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity();
- StringBuilder ddl = new StringBuilder();
-
- ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
- ddl.append("(" + "\n");
-
- for (int i = 0; i < tableDesc.getColumns().length; i++) {
- ColumnDesc col = tableDesc.getColumns()[i];
- if (i > 0) {
- ddl.append(",");
- }
- ddl.append(col.getName() + " " + getHiveDataType((col.getDatatype())) + "\n");
- }
-
- ddl.append(")" + "\n");
- ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n");
- ddl.append("STORED AS TEXTFILE");
-
- return new String[] { dropsql, ddl.toString() };
- }
-
- private static String getHiveDataType(String javaDataType) {
- String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType;
- hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType;
-
- return hiveDataType.toLowerCase();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
deleted file mode 100644
index 2d8bb05..0000000
--- a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.SSHClient;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.storage.hbase.steps.HBaseConnection;
-import org.apache.kylin.storage.hbase.steps.HBaseMiniclusterHelper;
-
-public class ExportHBaseData {
-
- KylinConfig kylinConfig;
- HTableDescriptor[] allTables;
- Configuration config;
- HBaseAdmin hbase;
- CliCommandExecutor cli;
- String exportHdfsFolder;
- String exportLocalFolderParent;
- String exportLocalFolder;
- String backupArchive;
- String tableNameBase;
- long currentTIME;
-
- public ExportHBaseData() {
- try {
- setup();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void setup() throws IOException {
-
- KylinConfig.destoryInstance();
- System.setProperty(KylinConfig.KYLIN_CONF, AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- cli = kylinConfig.getCliCommandExecutor();
-
- currentTIME = System.currentTimeMillis();
- exportHdfsFolder = kylinConfig.getHdfsWorkingDirectory() + "hbase-export/" + currentTIME + "/";
- exportLocalFolderParent = BatchConstants.CFG_KYLIN_LOCAL_TEMP_DIR + "hbase-export/";
- exportLocalFolder = exportLocalFolderParent + currentTIME + "/";
- backupArchive = exportLocalFolderParent + "hbase-export-at-" + currentTIME + ".tar.gz";
-
- String metadataUrl = kylinConfig.getMetadataUrl();
- // split TABLE@HBASE_URL
- int cut = metadataUrl.indexOf('@');
- tableNameBase = metadataUrl.substring(0, cut);
- String hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
-
- HConnection conn = HBaseConnection.get(hbaseUrl);
- try {
- hbase = new HBaseAdmin(conn);
- config = hbase.getConfiguration();
- allTables = hbase.listTables();
- } catch (IOException e) {
- e.printStackTrace();
- throw e;
- }
- }
-
- public void tearDown() {
-
- // cleanup hdfs
- try {
- if (cli != null && exportHdfsFolder != null) {
- cli.execute("hadoop fs -rm -r " + exportHdfsFolder);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- // cleanup sandbox disk
- try {
- if (cli != null && exportLocalFolder != null) {
- cli.execute("rm -r " + exportLocalFolder);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- // delete archive file on sandbox
- try {
- if (cli != null && backupArchive != null) {
- cli.execute("rm " + backupArchive);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void exportTables() throws IOException {
- cli.execute("mkdir -p " + exportLocalFolderParent);
-
- for (HTableDescriptor table : allTables) {
- String tName = table.getNameAsString();
- if (!tName.equals(tableNameBase) && !tName.startsWith(HBaseMiniclusterHelper.SHARED_STORAGE_PREFIX))
- continue;
-
- cli.execute("hbase org.apache.hadoop.hbase.mapreduce.Export " + tName + " " + exportHdfsFolder + tName);
- }
-
- cli.execute("hadoop fs -copyToLocal " + exportHdfsFolder + " " + exportLocalFolderParent);
- cli.execute("tar -zcvf " + backupArchive + " --directory=" + exportLocalFolderParent + " " + currentTIME);
- downloadToLocal();
- }
-
- public void downloadToLocal() throws IOException {
- String localArchive = "../examples/test_case_data/minicluster/hbase-export.tar.gz";
-
- if (kylinConfig.getRunAsRemoteCommand()) {
- SSHClient ssh = new SSHClient(kylinConfig.getRemoteHadoopCliHostname(), kylinConfig.getRemoteHadoopCliPort(), kylinConfig.getRemoteHadoopCliUsername(), kylinConfig.getRemoteHadoopCliPassword());
- try {
- ssh.scpFileToLocal(backupArchive, localArchive);
- } catch (Exception e) {
- e.printStackTrace();
- }
- } else {
- FileUtils.copyFile(new File(backupArchive), new File(localArchive));
- }
- }
-
- public static void main(String[] args) {
- ExportHBaseData export = new ExportHBaseData();
- try {
- export.exportTables();
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- export.tearDown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
deleted file mode 100644
index 6a615cb..0000000
--- a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import java.io.File;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-@Ignore("this test case will break existing metadata store")
-public class ITKafkaBasedIIStreamBuilderTest {
-
- private static final Logger logger = LoggerFactory.getLogger(ITKafkaBasedIIStreamBuilderTest.class);
-
- private KylinConfig kylinConfig;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
- }
-
- @Before
- public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- DeployUtil.initCliWorkDir();
- DeployUtil.deployMetadata();
- DeployUtil.overrideJobJarLocations();
- }
-
- @Test
- public void test() throws Exception {
- final StreamingBootstrap bootstrap = StreamingBootstrap.getInstance(kylinConfig);
- bootstrap.start("eagle", 0);
- Thread.sleep(30 * 60 * 1000);
- logger.info("time is up, stop streaming");
- bootstrap.stop();
- Thread.sleep(5 * 1000);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java b/job/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
deleted file mode 100644
index 44ba8f4..0000000
--- a/job/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dataGen;
-
-import java.util.ArrayList;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ColumnConfig {
- @JsonProperty("columnName")
- private String columnName;
- @JsonProperty("valueSet")
- private ArrayList<String> valueSet;
- @JsonProperty("exclusive")
- private boolean exclusive;
- @JsonProperty("asRange")
- private boolean asRange;
-
- public boolean isAsRange() {
- return asRange;
- }
-
- public void setAsRange(boolean asRange) {
- this.asRange = asRange;
- }
-
- public boolean isExclusive() {
- return exclusive;
- }
-
- public void setExclusive(boolean exclusive) {
- this.exclusive = exclusive;
- }
-
- public String getColumnName() {
- return columnName;
- }
-
- public void setColumnName(String columnName) {
- this.columnName = columnName;
- }
-
- public ArrayList<String> getValueSet() {
- return valueSet;
- }
-
- public void setValueSet(ArrayList<String> valueSet) {
- this.valueSet = valueSet;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
deleted file mode 100644
index a965753..0000000
--- a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ /dev/null
@@ -1,647 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dataGen;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- */
-public class FactTableGenerator {
- CubeInstance cube = null;
- CubeDesc desc = null;
- ResourceStore store = null;
- String factTableName = null;
-
- GenConfig genConf = null;
-
- Random r = null;
-
- String cubeName;
- long randomSeed;
- int rowCount;
- int unlinkableRowCount;
- int unlinkableRowCountMax;
- double conflictRatio;
- double linkableRatio;
-
- // the names of lookup table columns which is in relation with fact
- // table(appear as fk in fact table)
- TreeMap<String, LinkedList<String>> lookupTableKeys = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-
- // possible values of lookupTableKeys, extracted from existing lookup
- // tables.
- // The key is in the format of tablename/columnname
- TreeMap<String, ArrayList<String>> feasibleValues = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-
- // lookup table name -> sets of all composite keys
- TreeMap<String, HashSet<Array<String>>> lookupTableCompositeKeyValues = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-
- private void init(String cubeName, int rowCount, double conflictRaio, double linkableRatio, long randomSeed) {
- this.rowCount = rowCount;
- this.conflictRatio = conflictRaio;
- this.cubeName = cubeName;
- this.randomSeed = randomSeed;
- this.linkableRatio = linkableRatio;
-
- this.unlinkableRowCountMax = (int) (this.rowCount * (1 - linkableRatio));
- this.unlinkableRowCount = 0;
-
- r = new Random(randomSeed);
-
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- cube = CubeManager.getInstance(config).getCube(cubeName);
- desc = cube.getDescriptor();
- factTableName = desc.getFactTable();
- store = ResourceStore.getStore(config);
- }
-
- /*
- * users can specify the value preference for each column
- */
- private void loadConfig() {
- try {
- InputStream configStream = null;
- configStream = store.getResource("/data/data_gen_config.json");
- this.genConf = GenConfig.loadConfig(configStream);
-
- if (configStream != null)
- configStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- private void loadLookupTableValues(String lookupTableName, LinkedList<String> columnNames, int distinctRowCount) throws Exception {
- KylinConfig config = KylinConfig.getInstanceFromEnv();
-
- // only deal with composite keys
- if (columnNames.size() > 1 && !lookupTableCompositeKeyValues.containsKey(lookupTableName)) {
- lookupTableCompositeKeyValues.put(lookupTableName, new HashSet<Array<String>>());
- }
-
- InputStream tableStream = null;
- BufferedReader tableReader = null;
- try {
- TreeMap<String, Integer> zeroBasedInice = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
- for (String columnName : columnNames) {
- ColumnDesc cDesc = MetadataManager.getInstance(config).getTableDesc(lookupTableName).findColumnByName(columnName);
- zeroBasedInice.put(columnName, cDesc.getZeroBasedIndex());
- }
-
- String path = "/data/" + lookupTableName + ".csv";
- tableStream = store.getResource(path);
- tableReader = new BufferedReader(new InputStreamReader(tableStream));
- tableReader.mark(0);
- int rowCount = 0;
- int curRowNum = 0;
- String curRow;
-
- while (tableReader.readLine() != null)
- rowCount++;
-
- HashSet<Integer> rows = new HashSet<Integer>();
- distinctRowCount = (distinctRowCount < rowCount) ? distinctRowCount : rowCount;
- while (rows.size() < distinctRowCount) {
- rows.add(r.nextInt(rowCount));
- }
-
- // reopen the stream
- tableReader.close();
- tableStream.close();
- tableStream = null;
- tableReader = null;
-
- tableStream = store.getResource(path);
- tableReader = new BufferedReader(new InputStreamReader(tableStream));
-
- while ((curRow = tableReader.readLine()) != null) {
- if (rows.contains(curRowNum)) {
- String[] tokens = curRow.split(",");
-
- String[] comboKeys = null;
- int index = 0;
- if (columnNames.size() > 1)
- comboKeys = new String[columnNames.size()];
-
- for (String columnName : columnNames) {
- int zeroBasedIndex = zeroBasedInice.get(columnName);
- if (!feasibleValues.containsKey(lookupTableName + "/" + columnName))
- feasibleValues.put(lookupTableName + "/" + columnName, new ArrayList<String>());
- feasibleValues.get(lookupTableName + "/" + columnName).add(tokens[zeroBasedIndex]);
-
- if (columnNames.size() > 1) {
- comboKeys[index] = tokens[zeroBasedIndex];
- index++;
- }
- }
-
- if (columnNames.size() > 1) {
- Array<String> wrap = new Array<String>(comboKeys);
- if (lookupTableCompositeKeyValues.get(lookupTableName).contains(wrap)) {
- throw new Exception("The composite key already exist in the lookup table");
- }
- lookupTableCompositeKeyValues.get(lookupTableName).add(wrap);
- }
- }
- curRowNum++;
- }
-
- if (tableStream != null)
- tableStream.close();
- if (tableReader != null)
- tableReader.close();
-
- } catch (IOException e) {
- e.printStackTrace();
- System.exit(1);
- }
- }
-
- // prepare the candidate values for each joined column
- private void prepare() throws Exception {
- // load config
- loadConfig();
-
- TreeSet<String> factTableColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
-
- for (DimensionDesc dim : desc.getDimensions()) {
- for (TblColRef col : dim.getColumnRefs()) {
- if (col.getTable().equals(factTableName))
- factTableColumns.add(col.getName());
- }
-
- JoinDesc join = dim.getJoin();
- if (join != null) {
- String lookupTable = dim.getTable();
- for (String column : join.getPrimaryKey()) {
- if (!lookupTableKeys.containsKey(lookupTable)) {
- lookupTableKeys.put(lookupTable, new LinkedList<String>());
- }
-
- if (!lookupTableKeys.get(lookupTable).contains(column))
- lookupTableKeys.get(lookupTable).add(column);
- }
- }
- }
-
- int distinctRowCount = (int) (this.rowCount / this.conflictRatio);
- distinctRowCount = (distinctRowCount == 0) ? 1 : distinctRowCount;
- // lookup tables
- for (String lookupTable : lookupTableKeys.keySet()) {
- this.loadLookupTableValues(lookupTable, lookupTableKeys.get(lookupTable), distinctRowCount);
- }
- }
-
- private List<DimensionDesc> getSortedDimentsionDescs() {
- List<DimensionDesc> dimensions = desc.getDimensions();
- Collections.sort(dimensions, new Comparator<DimensionDesc>() {
- @Override
- public int compare(DimensionDesc o1, DimensionDesc o2) {
- JoinDesc j1 = o2.getJoin();
- JoinDesc j2 = o1.getJoin();
- return Integer.valueOf(j1 != null ? j1.getPrimaryKey().length : 0).compareTo(j2 != null ? j2.getPrimaryKey().length : 0);
- }
- });
- return dimensions;
- }
-
- /**
- * Generate the fact table and return it as text
- *
- * @return
- * @throws Exception
- */
- private String cookData() throws Exception {
- // the columns on the fact table can be classified into three groups:
- // 1. foreign keys
- TreeMap<String, String> factTableCol2LookupCol = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
- // 2. metrics or directly used dimensions
- TreeSet<String> usedCols = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
- // 3. others, not referenced anywhere
-
- TreeMap<String, String> lookupCol2factTableCol = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-
- // find fact table columns in fks
- List<DimensionDesc> dimensions = getSortedDimentsionDescs();
- for (DimensionDesc dim : dimensions) {
- JoinDesc jDesc = dim.getJoin();
- if (jDesc != null) {
- String[] fks = jDesc.getForeignKey();
- String[] pks = jDesc.getPrimaryKey();
- int num = fks.length;
- for (int i = 0; i < num; ++i) {
- String value = dim.getTable() + "/" + pks[i];
-
- lookupCol2factTableCol.put(value, fks[i]);
-
- if (factTableCol2LookupCol.containsKey(fks[i])) {
- if (!factTableCol2LookupCol.get(fks[i]).equals(value)) {
- System.out.println("Warning: Disambiguation on the mapping of column " + fks[i] + ", " + factTableCol2LookupCol.get(fks[i]) + "(chosen) or " + value);
- continue;
- }
- }
- factTableCol2LookupCol.put(fks[i], value);
- }
- }
- //else, deal with it in next roung
- }
-
- // find fact table columns in direct dimension
- // DO NOT merge this with the previous loop
- for (DimensionDesc dim : dimensions) {
- JoinDesc jDesc = dim.getJoin();
- if (jDesc == null) {
- // column on fact table used directly as a dimension
- for (String aColumn : dim.getColumn()) {
- if (!factTableCol2LookupCol.containsKey(aColumn))
- usedCols.add(aColumn);
- }
- }
- }
-
- // find fact table columns in measures
- for (MeasureDesc mDesc : desc.getMeasures()) {
- List<TblColRef> pcols = mDesc.getFunction().getParameter().getColRefs();
- if (pcols != null) {
- for (TblColRef col : pcols) {
- if (!factTableCol2LookupCol.containsKey(col.getName()))
- usedCols.add(col.getName());
- }
- }
- }
-
- return createTable(this.rowCount, factTableCol2LookupCol, lookupCol2factTableCol, usedCols);
- }
-
- private String normToTwoDigits(int v) {
- if (v < 10)
- return "0" + v;
- else
- return Integer.toString(v);
- }
-
- private String randomPick(ArrayList<String> candidates) {
- int index = r.nextInt(candidates.size());
- return candidates.get(index);
- }
-
- private String createRandomCell(ColumnDesc cDesc, ArrayList<String> range) throws Exception {
- DataType type = cDesc.getType();
- if (type.isStringFamily()) {
- throw new Exception("Can't handle range values for string");
-
- } else if (type.isIntegerFamily()) {
- int low = Integer.parseInt(range.get(0));
- int high = Integer.parseInt(range.get(1));
- return Integer.toString(r.nextInt(high - low) + low);
-
- } else if (type.isDouble()) {
- double low = Double.parseDouble(range.get(0));
- double high = Double.parseDouble(range.get(1));
- return String.format("%.4f", r.nextDouble() * (high - low) + low);
-
- } else if (type.isFloat()) {
- float low = Float.parseFloat(range.get(0));
- float high = Float.parseFloat(range.get(1));
- return String.format("%.4f", r.nextFloat() * (high - low) + low);
-
- } else if (type.isDecimal()) {
- double low = Double.parseDouble(range.get(0));
- double high = Double.parseDouble(range.get(1));
- return String.format("%.4f", r.nextDouble() * (high - low) + low);
-
- } else if (type.isDateTimeFamily()) {
- if (!type.isDate()) {
- throw new RuntimeException("Does not support " + type);
- }
-
- SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
- Date start = format.parse(range.get(0));
- Date end = format.parse(range.get(1));
- long diff = end.getTime() - start.getTime();
- Date temp = new Date(start.getTime() + (long) (diff * r.nextDouble()));
- Calendar cal = Calendar.getInstance();
- cal.setTime(temp);
- // first day
- cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
-
- return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
- } else {
- System.out.println("The data type " + type + "is not recognized");
- System.exit(1);
- }
- return null;
- }
-
- private String createRandomCell(ColumnDesc cDesc) {
- String type = cDesc.getTypeName();
- String s = type.toLowerCase();
- if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 2; i++) {
- sb.append((char) ('a' + r.nextInt(10)));// there are 10*10
- // possible strings
- }
- return sb.toString();
- } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
- return Integer.toString(r.nextInt(128));
- } else if (s.equals("double")) {
- return String.format("%.4f", r.nextDouble() * 100);
- } else if (s.equals("float")) {
- return String.format("%.4f", r.nextFloat() * 100);
- } else if (s.equals("decimal")) {
- return String.format("%.4f", r.nextDouble() * 100);
- } else if (s.equals("date")) {
- long date20131231 = 61349312153265L;
- long date20010101 = 60939158400000L;
- long diff = date20131231 - date20010101;
- Date temp = new Date(date20010101 + (long) (diff * r.nextDouble()));
- Calendar cal = Calendar.getInstance();
- cal.setTime(temp);
- // first day
- cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
-
- return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
- } else {
- System.out.println("The data type " + type + "is not recognized");
- System.exit(1);
- }
- return null;
- }
-
- private String createDefaultsCell(String type) {
- String s = type.toLowerCase();
- if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
- return "abcde";
- } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
- return "0";
- } else if (s.equals("double")) {
- return "0";
- } else if (s.equals("float")) {
- return "0";
- } else if (s.equals("decimal")) {
- return "0";
- } else if (s.equals("date")) {
- return "1970-01-01";
- } else {
- System.out.println("The data type " + type + "is not recognized");
- System.exit(1);
- }
- return null;
- }
-
- private void printColumnMappings(TreeMap<String, String> factTableCol2LookupCol, TreeSet<String> usedCols, TreeSet<String> defaultColumns) {
-
- System.out.println("=======================================================================");
- System.out.format("%-30s %s", "FACT_TABLE_COLUMN", "MAPPING");
- System.out.println();
- System.out.println();
- for (Map.Entry<String, String> entry : factTableCol2LookupCol.entrySet()) {
- System.out.format("%-30s %s", entry.getKey(), entry.getValue());
- System.out.println();
- }
- for (String key : usedCols) {
- System.out.format("%-30s %s", key, "Random Values");
- System.out.println();
- }
- for (String key : defaultColumns) {
- System.out.format("%-30s %s", key, "Default Values");
- System.out.println();
- }
- System.out.println("=======================================================================");
-
- System.out.println("Parameters:");
- System.out.println();
- System.out.println("CubeName: " + cubeName);
- System.out.println("RowCount: " + rowCount);
- System.out.println("ConflictRatio: " + conflictRatio);
- System.out.println("LinkableRatio: " + linkableRatio);
- System.out.println("Seed: " + randomSeed);
- System.out.println();
- System.out.println("The number of actual unlinkable fact rows is: " + this.unlinkableRowCount);
- System.out.println("You can vary the above parameters to generate different datasets.");
- System.out.println();
- }
-
- // Any row in the column must finally appear in the flatten big table.
- // for single-column joins the generated row is guaranteed to have a match
- // in lookup table
- // for composite keys we'll need an extra check
- private boolean matchAllCompositeKeys(TreeMap<String, String> lookupCol2FactTableCol, LinkedList<String> columnValues) {
- KylinConfig config = KylinConfig.getInstanceFromEnv();
-
- for (String lookupTable : lookupTableKeys.keySet()) {
- if (lookupTableKeys.get(lookupTable).size() == 1)
- continue;
-
- String[] comboKey = new String[lookupTableKeys.get(lookupTable).size()];
- int index = 0;
- for (String column : lookupTableKeys.get(lookupTable)) {
- String key = lookupTable + "/" + column;
- String factTableCol = lookupCol2FactTableCol.get(key);
- int cardinal = MetadataManager.getInstance(config).getTableDesc(factTableName).findColumnByName(factTableCol).getZeroBasedIndex();
- comboKey[index] = columnValues.get(cardinal);
-
- index++;
- }
- Array<String> wrap = new Array<String>(comboKey);
- if (!lookupTableCompositeKeyValues.get(lookupTable).contains(wrap)) {
- // System.out.println("Try " + wrap + " Failed, continue...");
- return false;
- }
- }
- return true;
- }
-
- private String createCell(ColumnDesc cDesc) throws Exception {
- ColumnConfig cConfig = null;
-
- if ((cConfig = genConf.getColumnConfigByName(cDesc.getName())) == null) {
- // if the column is not configured, use random values
- return (createRandomCell(cDesc));
-
- } else {
- // the column has a configuration
- if (!cConfig.isAsRange() && !cConfig.isExclusive() && r.nextBoolean()) {
- // if the column still allows random values
- return (createRandomCell(cDesc));
-
- } else {
- // use specified values
- ArrayList<String> valueSet = cConfig.getValueSet();
- if (valueSet == null || valueSet.size() == 0)
- throw new Exception("Did you forget to specify value set for " + cDesc.getName());
-
- if (!cConfig.isAsRange()) {
- return (randomPick(valueSet));
- } else {
- if (valueSet.size() != 2)
- throw new Exception("Only two values can be set for range values, the column: " + cDesc.getName());
-
- return (createRandomCell(cDesc, valueSet));
- }
- }
-
- }
- }
-
- private LinkedList<String> createRow(TreeMap<String, String> factTableCol2LookupCol, TreeSet<String> usedCols, TreeSet<String> defaultColumns) throws Exception {
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- LinkedList<String> columnValues = new LinkedList<String>();
-
- for (ColumnDesc cDesc : MetadataManager.getInstance(config).getTableDesc(factTableName).getColumns()) {
-
- String colName = cDesc.getName();
-
- if (factTableCol2LookupCol.containsKey(colName)) {
-
- // if the current column is a fk column in fact table
- ArrayList<String> candidates = this.feasibleValues.get(factTableCol2LookupCol.get(colName));
-
- columnValues.add(candidates.get(r.nextInt(candidates.size())));
- } else if (usedCols.contains(colName)) {
-
- // if the current column is a metric column in fact table
- columnValues.add(createCell(cDesc));
- } else {
-
- // otherwise this column is not useful in OLAP
- columnValues.add(createDefaultsCell(cDesc.getTypeName()));
- defaultColumns.add(colName);
- }
- }
-
- return columnValues;
- }
-
- /**
- * return the text of table contents(one line one row)
- *
- * @param rowCount
- * @param factTableCol2LookupCol
- * @param lookupCol2FactTableCol
- * @param usedCols
- * @return
- * @throws Exception
- */
- private String createTable(int rowCount, TreeMap<String, String> factTableCol2LookupCol, TreeMap<String, String> lookupCol2FactTableCol, TreeSet<String> usedCols) throws Exception {
- try {
- TreeSet<String> defaultColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
-
- StringBuffer sb = new StringBuffer();
- for (int i = 0; i < rowCount;) {
-
- LinkedList<String> columnValues = createRow(factTableCol2LookupCol, usedCols, defaultColumns);
-
- if (!matchAllCompositeKeys(lookupCol2FactTableCol, columnValues)) {
- if (unlinkableRowCount < unlinkableRowCountMax) {
- unlinkableRowCount++;
- } else {
- continue;
- }
- }
-
- for (String c : columnValues)
- sb.append(c + ",");
- sb.deleteCharAt(sb.length() - 1);
- sb.append(System.getProperty("line.separator"));
-
- i++;
-
- // System.out.println("Just generated the " + i + "th record");
- }
-
- printColumnMappings(factTableCol2LookupCol, usedCols, defaultColumns);
-
- return sb.toString();
-
- } catch (IOException e) {
- e.printStackTrace();
- System.exit(1);
- }
-
- return null;
- }
-
- /**
- * Randomly create a fact table and return the table content
- *
- * @param cubeName name of the cube
- * @param rowCount expected row count generated
- * @param linkableRatio the percentage of fact table rows that can be linked with all
- * lookup table by INNER join
- * @param randomSeed random seed
- */
- public static String generate(String cubeName, String rowCount, String linkableRatio, String randomSeed) throws Exception {
-
- if (rowCount == null)
- rowCount = "10000";
- if (linkableRatio == null)
- linkableRatio = "0.6";
-
- //if (randomSeed == null)
- // don't give it value
-
- // String conflictRatio = "5";//this parameter do not allow configuring
- // any more
-
- FactTableGenerator generator = new FactTableGenerator();
- long seed;
- if (randomSeed != null) {
- seed = Long.parseLong(randomSeed);
- } else {
- Random r = new Random();
- seed = r.nextLong();
- }
-
- generator.init(cubeName, Integer.parseInt(rowCount), 5, Double.parseDouble(linkableRatio), seed);
- generator.prepare();
- return generator.cookData();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java b/job/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
deleted file mode 100644
index c58cfb6..0000000
--- a/job/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dataGen;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.kylin.common.util.JsonUtil;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class GenConfig {
-
- @JsonProperty("columnConfigs")
- private ArrayList<ColumnConfig> columnConfigs;
-
- private HashMap<String, ColumnConfig> cache = new HashMap<String, ColumnConfig>();
-
- public ArrayList<ColumnConfig> getColumnConfigs() {
- return columnConfigs;
- }
-
- public void setColumnConfigs(ArrayList<ColumnConfig> columnConfigs) {
- this.columnConfigs = columnConfigs;
- }
-
- public ColumnConfig getColumnConfigByName(String columnName) {
- columnName = columnName.toLowerCase();
-
- if (cache.containsKey(columnName))
- return cache.get(columnName);
-
- for (ColumnConfig cConfig : columnConfigs) {
- if (cConfig.getColumnName().toLowerCase().equals(columnName)) {
- cache.put(columnName, cConfig);
- return cConfig;
- }
- }
- cache.put(columnName, null);
- return null;
- }
-
- public static GenConfig loadConfig(InputStream stream) {
- try {
- GenConfig config = JsonUtil.readValue(stream, GenConfig.class);
- return config;
- } catch (JsonMappingException e) {
- e.printStackTrace();
- } catch (JsonParseException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
deleted file mode 100644
index ebc4114..0000000
--- a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.kylin.job.dataGen;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-/**
- * data gen for II streaming, may be merged with StreamingTableDataGenerator
- */
-public class StreamingDataGenerator {
- private static final Logger logger = LoggerFactory.getLogger(StreamingDataGenerator.class);
- private static Random random = new Random();
- private static String[] decimalFormat = new String[] { "%.4f", "%.5f", "%.6f" };
-
- public static Iterator<String> generate(final long start, final long end, final int count) {
- final KylinConfig config = KylinConfig.getInstanceFromEnv();
- final IIInstance ii = IIManager.getInstance(config).getII("test_streaming_table_ii");
- final IIDesc iiDesc = ii.getDescriptor();
- final List<TblColRef> columns = iiDesc.listAllColumns();
-
- return new Iterator<String>() {
- private Map<String, String> values = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
- private int index = 0;
-
- @Override
- public boolean hasNext() {
- return this.index < count;
- }
-
- @Override
- public String next() {
- values.clear();
- long ts = this.createTs(start, end);
- values.put("minute_start", Long.toString(TimeUtil.getMinuteStart(ts)));
- values.put("hour_start", Long.toString(TimeUtil.getHourStart(ts)));
- values.put("day_start", Long.toString(TimeUtil.getDayStart(ts)));
- values.put("itm", Integer.toString(random.nextInt(20)));
- values.put("site", Integer.toString(random.nextInt(5)));
-
- values.put("gmv", String.format(decimalFormat[random.nextInt(3)], random.nextFloat() * 100));
- values.put("item_count", Integer.toString(random.nextInt(5)));
-
- if (values.size() != columns.size()) {
- throw new RuntimeException("the structure of streaming table has changed, need to modify generator too");
- }
-
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- try {
- JsonUtil.writeValue(os, values);
- } catch (IOException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- index++;
- return new String(os.toByteArray());
- }
-
- @Override
- public void remove() {
- }
-
- private long createTs(final long start, final long end) {
- return start + (long) (random.nextDouble() * (end - start));
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
deleted file mode 100644
index dcd460b..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.util.FIFOIterable;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.invertedindex.model.KeyValueCodec;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.ClearTextDictionary;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregators;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StringStreamParser;
-import org.apache.kylin.streaming.invertedindex.SliceBuilder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- */
-public class IITest extends LocalFileMetadataTestCase {
-
- String iiName = "test_kylin_ii_inner_join";
- IIInstance ii;
- IIDesc iiDesc;
- String cubeName = "test_kylin_cube_with_slr_empty";
-
- List<IIRow> iiRows;
-
- final String[] inputData = new String[] { //
- "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
- "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
- "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
-
- @Before
- public void setUp() throws Exception {
- this.createTestMetadata();
- this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
- this.iiDesc = ii.getDescriptor();
-
- List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
- @Nullable
- @Override
- public StreamMessage apply(String input) {
- return new StreamMessage(System.currentTimeMillis(), input.getBytes());
- }
- });
-
- List<List<String>> parsedStreamMessages = Lists.newArrayList();
- StreamParser parser = StringStreamParser.instance;
-
- MicroStreamBatch batch = new MicroStreamBatch(0);
- for (StreamMessage message : streamMessages) {
- ParsedStreamMessage parsedStreamMessage = parser.parse(message);
- if ((parsedStreamMessage.isAccepted())) {
- batch.add(parsedStreamMessage);
- }
- }
-
- iiRows = Lists.newArrayList();
- final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
- IIKeyValueCodec codec = new IIKeyValueCodec(slice.getInfo());
- for (IIRow iiRow : codec.encodeKeyValue(slice)) {
- iiRows.add(iiRow);
- }
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-
- /**
- * simulate stream building into slices, and encode the slice into IIRows.
- * Then reconstruct the IIRows to slice.
- */
- @Test
- public void basicTest() {
- Queue<IIRow> buffer = Lists.newLinkedList();
- FIFOIterable bufferIterable = new FIFOIterable(buffer);
- TableRecordInfo info = new TableRecordInfo(iiDesc);
- TableRecordInfoDigest digest = info.getDigest();
- KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
- Iterator<Slice> slices = codec.decodeKeyValue(bufferIterable).iterator();
-
- Assert.assertTrue(!slices.hasNext());
- Assert.assertEquals(iiRows.size(), digest.getColumnCount());
-
- for (int i = 0; i < digest.getColumnCount(); ++i) {
- buffer.add(iiRows.get(i));
-
- if (i != digest.getColumnCount() - 1) {
- Assert.assertTrue(!slices.hasNext());
- } else {
- Assert.assertTrue(slices.hasNext());
- }
- }
-
- Slice newSlice = slices.next();
- Assert.assertEquals(newSlice.getLocalDictionaries()[0].getSize(), 2);
- }
-
- @Test
- public void IIEndpointTest() {
- TableRecordInfo info = new TableRecordInfo(ii.getDescriptor());
- CoprocessorRowType type = CoprocessorRowType.fromTableRecordInfo(info, ii.getFirstSegment().getColumns());
- CoprocessorProjector projector = CoprocessorProjector.makeForEndpoint(info, Collections.singletonList(ii.getDescriptor().findColumnRef("default.test_kylin_fact", "lstg_format_name")));
-
- FunctionDesc f1 = new FunctionDesc();
- f1.setExpression("SUM");
- ParameterDesc p1 = new ParameterDesc();
- p1.setType("column");
- p1.setValue("PRICE");
- f1.setParameter(p1);
- f1.setReturnType("decimal(19,4)");
-
- TblColRef column = ii.getDescriptor().findColumnRef("default.test_kylin_fact", "cal_dt");
- CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GTE);
- ColumnTupleFilter columnFilter = new ColumnTupleFilter(column);
- compareFilter.addChild(columnFilter);
- ConstantTupleFilter constantFilter = null;
- constantFilter = new ConstantTupleFilter(("2012-08-16"));
- compareFilter.addChild(constantFilter);
-
- EndpointAggregators aggregators = EndpointAggregators.fromFunctions(info, Collections.singletonList(f1));
- CoprocessorFilter filter = CoprocessorFilter.fromFilter(new ClearTextDictionary(info), compareFilter, FilterDecorator.FilterConstantsTreatment.AS_IT_IS);
-
- final Iterator<IIRow> iiRowIterator = iiRows.iterator();
-
- IIEndpoint endpoint = new IIEndpoint();
- IIProtos.IIResponseInternal response = endpoint.getResponse(new RegionScanner() {
- @Override
- public HRegionInfo getRegionInfo() {
- throw new NotImplementedException();
- }
-
- @Override
- public boolean isFilterDone() throws IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public boolean reseek(byte[] row) throws IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public long getMaxResultSize() {
- throw new NotImplementedException();
-
- }
-
- @Override
- public long getMvccReadPoint() {
- throw new NotImplementedException();
- }
-
- @Override
- public boolean nextRaw(List<Cell> result) throws IOException {
- if (iiRowIterator.hasNext()) {
- IIRow iiRow = iiRowIterator.next();
- result.addAll(iiRow.makeCells());
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public boolean next(List<Cell> results) throws IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public boolean next(List<Cell> result, int limit) throws IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public void close() throws IOException {
- throw new NotImplementedException();
- }
- }, type, projector, aggregators, filter);
-
- Assert.assertEquals(2, response.getRowsList().size());
- System.out.println(response.getRowsList().size());
- Set<String> answers = Sets.newHashSet("120.4747", "26.8551");
- for (IIProtos.IIResponseInternal.IIRow responseRow : response.getRowsList()) {
- byte[] measuresBytes = responseRow.getMeasures().toByteArray();
- List<Object> metrics = aggregators.deserializeMetricValues(measuresBytes, 0);
- Assert.assertTrue(answers.contains(metrics.get(0)));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
deleted file mode 100644
index fd2eceb..0000000
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.lock.MockJobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-
-/**
- */
-public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase {
-
- private DefaultScheduler scheduler;
-
- protected ExecutableManager jobService;
-
- static void setFinalStatic(Field field, Object newValue) throws Exception {
- field.setAccessible(true);
-
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-
- field.set(null, newValue);
- }
-
- protected void waitForJobFinish(String jobId) {
- while (true) {
- AbstractExecutable job = jobService.getJob(jobId);
- final ExecutableState status = job.getStatus();
- if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) {
- break;
- } else {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- protected void waitForJobStatus(String jobId, ExecutableState state, long interval) {
- while (true) {
- AbstractExecutable job = jobService.getJob(jobId);
- if (job.getStatus() == state) {
- break;
- } else {
- try {
- Thread.sleep(interval);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- @BeforeClass
- public static void beforeClass() {
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
- DeployUtil.overrideJobJarLocations();
- }
-
- @Before
- public void setup() throws Exception {
- createTestMetadata();
- setFinalStatic(ExecutableConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10);
- jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
- scheduler = DefaultScheduler.getInstance();
- scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock());
- if (!scheduler.hasStarted()) {
- throw new RuntimeException("scheduler has not been started");
- }
-
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
deleted file mode 100644
index 7c33d02..0000000
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kylin.job.BaseTestExecutable;
-import org.apache.kylin.job.ErrorTestExecutable;
-import org.apache.kylin.job.FailedTestExecutable;
-import org.apache.kylin.job.SelfStopExecutable;
-import org.apache.kylin.job.SucceedTestExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.junit.Test;
-
-/**
- */
-public class DefaultSchedulerTest extends BaseSchedulerTest {
-
- @Test
- public void testSingleTaskJob() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- job.addTask(task1);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- }
-
- @Test
- public void testSucceed() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- BaseTestExecutable task2 = new SucceedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
- }
-
- @Test
- public void testSucceedAndFailed() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- BaseTestExecutable task2 = new FailedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
- }
-
- @Test
- public void testSucceedAndError() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new ErrorTestExecutable();
- BaseTestExecutable task2 = new SucceedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
- }
-
- @Test
- public void testDiscard() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SelfStopExecutable();
- job.addTask(task1);
- jobService.addJob(job);
- waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
- jobService.discardJob(job.getId());
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
- Thread.sleep(5000);
- System.out.println(job);
- }
-
- @Test
- public void testSchedulerPool() throws InterruptedException {
- ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
- final CountDownLatch countDownLatch = new CountDownLatch(3);
- ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- countDownLatch.countDown();
- }
- }, 5, 5, TimeUnit.SECONDS);
- assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
- assertTrue("future should still running", future.cancel(true));
-
- final CountDownLatch countDownLatch2 = new CountDownLatch(3);
- ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- countDownLatch2.countDown();
- throw new RuntimeException();
- }
- }, 5, 5, TimeUnit.SECONDS);
- assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
- assertFalse("future2 should has been stopped", future2.cancel(true));
-
- final CountDownLatch countDownLatch3 = new CountDownLatch(3);
- ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- countDownLatch3.countDown();
- throw new RuntimeException();
- } catch (Exception e) {
- }
- }
- }, 5, 5, TimeUnit.SECONDS);
- assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
- assertTrue("future3 should still running", future3.cancel(true));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
deleted file mode 100644
index be4fa26..0000000
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-@Ignore
-public class CubeStreamConsumerTest {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumerTest.class);
-
- private KylinConfig kylinConfig;
-
- private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
- }
-
- @Before
- public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- DeployUtil.initCliWorkDir();
- DeployUtil.deployMetadata();
- DeployUtil.overrideJobJarLocations();
- final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
- // remove all existing segments
- CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
-
- }
-
- @Test
- public void test() throws Exception {
- LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
- List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
- queues.add(queue);
- StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
- final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
- loadDataFromLocalFile(queue, 100000);
- future.get();
- }
-
- private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
- BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
- String line;
- int count = 0;
- while ((line = br.readLine()) != null && count++ < maxCount) {
- final List<String> strings = Arrays.asList(line.split("\t"));
- queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
- }
- queue.put(StreamMessage.EOF);
- }
-}