You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/26 17:21:04 UTC
[18/32] incubator-kylin git commit: KYLIN-697 non-integration tests
all passed
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a0d9d20/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java
new file mode 100644
index 0000000..cfb7bdb
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java
@@ -0,0 +1,270 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
+import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static org.junit.Assert.fail;
+
+/**
+ */
+public class ITBuildIIWithStreamTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(ITBuildIIWithStreamTest.class);
+
+ private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
+ private IIManager iiManager;
+ private KylinConfig kylinConfig;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+ System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+ }
+
+ @Before
+ public void before() throws Exception {
+ HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ DeployUtil.overrideJobJarLocations();
+
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ iiManager = IIManager.getInstance(kylinConfig);
+ iiManager = IIManager.getInstance(kylinConfig);
+ for (String iiInstance : II_NAME) {
+
+ IIInstance ii = iiManager.getII(iiInstance);
+ if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
+ ii.setStatus(RealizationStatusEnum.DISABLED);
+ iiManager.updateII(ii,true);
+ }
+ }
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ backup();
+ }
+
+ private static int cleanupOldStorage() throws Exception {
+ String[] args = { "--delete", "true" };
+ int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+ return exitCode;
+ }
+
+ private static void backup() throws Exception {
+ int exitCode = cleanupOldStorage();
+ if (exitCode == 0) {
+ exportHBaseData();
+ }
+ }
+
+ private static void exportHBaseData() throws IOException {
+ ExportHBaseData export = new ExportHBaseData();
+ export.exportTables();
+ }
+
+ private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException {
+ IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
+ JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
+ final String uuid = UUID.randomUUID().toString();
+ final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, uuid);
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, jobEngineConfig.getHdfsWorkingDirectory() + "/kylin-" + uuid, uuid);
+ String insertDataHqls;
+ try {
+ insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, uuid, jobEngineConfig);
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
+ }
+
+ ShellExecutable step = new ShellExecutable();
+ StringBuffer buf = new StringBuffer();
+ buf.append("hive -e \"");
+ buf.append(dropTableHql + "\n");
+ buf.append(createTableHql + "\n");
+ buf.append(insertDataHqls + "\n");
+ buf.append("\"");
+
+ step.setCmd(buf.toString());
+ logger.info(step.getCmd());
+ step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+ kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
+ return intermediateTableDesc.getTableName(uuid);
+ }
+
+ private void clearSegment(String iiName) throws Exception {
+ IIInstance ii = iiManager.getII(iiName);
+ ii.getSegments().clear();
+ iiManager.updateII(ii,true);
+ }
+
+ private IISegment createSegment(String iiName) throws Exception {
+ clearSegment(iiName);
+ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+ f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+ long date1 = 0;
+ long date2 = f.parse("2015-01-01").getTime();
+ return buildSegment(iiName, date1, date2);
+ }
+
+ private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception {
+ IIInstance iiInstance = iiManager.getII(iiName);
+ IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
+ iiInstance.getSegments().add(segment);
+ iiManager.updateII(iiInstance,true);
+ return segment;
+ }
+
+ private void buildII(String iiName) throws Exception {
+ final IIDesc desc = iiManager.getII(iiName).getDescriptor();
+ final String tableName = createIntermediateTable(desc, kylinConfig);
+ logger.info("intermediate table name:" + tableName);
+ final Configuration conf = new Configuration();
+ HCatInputFormat.setInput(conf, "default", tableName);
+ final HCatSchema tableSchema = HCatInputFormat.getTableSchema(conf);
+ logger.info(StringUtils.join(tableSchema.getFieldNames(), "\n"));
+ HiveTableReader reader = new HiveTableReader("default", tableName);
+ final List<TblColRef> tblColRefs = desc.listAllColumns();
+ for (TblColRef tblColRef : tblColRefs) {
+ if (desc.isMetricsCol(tblColRef)) {
+ logger.info("matrix:" + tblColRef.getName());
+ } else {
+ logger.info("measure:" + tblColRef.getName());
+ }
+ }
+ LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>();
+ final IISegment segment = createSegment(iiName);
+ String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
+ ToolRunner.run(new IICreateHTableJob(), args);
+
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final IIStreamBuilder streamBuilder = new IIStreamBuilder(queue, iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0);
+
+ List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
+ int count = sorted.size();
+ for (String[] row : sorted) {
+ logger.info("another row: " + StringUtils.join(row, ","));
+ queue.put(parse(row));
+ }
+
+ reader.close();
+ logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
+ queue.put(StreamMessage.EOF);
+ final Future<?> future = executorService.submit(streamBuilder);
+ try {
+ future.get();
+ } catch (Exception e) {
+ logger.error("stream build failed", e);
+ fail("stream build failed");
+ }
+
+ logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
+ }
+
+ @Test
+ public void test() throws Exception {
+ for (String iiName : II_NAME) {
+ buildII(iiName);
+ IIInstance ii = iiManager.getII(iiName);
+ if (ii.getStatus() != RealizationStatusEnum.READY) {
+ ii.setStatus(RealizationStatusEnum.READY);
+ iiManager.updateII(ii,true);
+ }
+ }
+ }
+
+ private StreamMessage parse(String[] row) {
+ return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
+ }
+
+ private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {
+ List<String[]> unsorted = Lists.newArrayList();
+ while (reader.next()) {
+ unsorted.add(reader.getRow());
+ }
+ Collections.sort(unsorted, new Comparator<String[]>() {
+ @Override
+ public int compare(String[] o1, String[] o2) {
+ long t1 = DateFormat.stringToMillis(o1[tsCol]);
+ long t2 = DateFormat.stringToMillis(o2[tsCol]);
+ return Long.compare(t1, t2);
+ }
+ });
+ return unsorted;
+ }
+
+}