You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/19 02:27:52 UTC
[03/12] incubator-kylin git commit: KYLIN-1010 Decompose project job
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
deleted file mode 100644
index dc6d312..0000000
--- a/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.MicroStreamBatchConsumer;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StreamingManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase {
-
- private static final Logger logger = LoggerFactory.getLogger(PeriodicalStreamBuilderTest.class);
-
- @Before
- public void setup() {
- this.createTestMetadata();
-
- }
-
- @After
- public void clear() {
- this.cleanupTestMetadata();
- }
-
- private List<StreamMessage> prepareTestData(long start, long end, int count) {
- double step = (double) (end - start) / (count - 1);
- long ts = start;
- int offset = 0;
- ArrayList<StreamMessage> result = Lists.newArrayList();
- for (int i = 0; i < count - 1; ++i) {
- result.add(new StreamMessage(offset++, String.valueOf(ts).getBytes()));
- ts += step;
- }
- result.add(new StreamMessage(offset++, String.valueOf(end).getBytes()));
- assertEquals(count, result.size());
- assertEquals(start + "", new String(result.get(0).getRawData()));
- assertEquals(end + "", new String(result.get(count - 1).getRawData()));
- return result;
- }
-
- @Test
- public void test() throws ExecutionException, InterruptedException {
-
- List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
- queues.add(new LinkedBlockingQueue<StreamMessage>());
- queues.add(new LinkedBlockingQueue<StreamMessage>());
-
- final long interval = 3000L;
- final long nextPeriodStart = TimeUtil.getNextPeriodStart(System.currentTimeMillis(), interval);
-
- final List<Integer> partitionIds = Lists.newArrayList();
- for (int i = 0; i < queues.size(); i++) {
- partitionIds.add(i);
- }
-
- final MicroStreamBatchConsumer consumer = new MicroStreamBatchConsumer() {
- @Override
- public void consume(MicroStreamBatch microStreamBatch) throws Exception {
- logger.info("consuming batch:" + microStreamBatch.getPartitionId() + " count:" + microStreamBatch.size() + " timestamp:" + microStreamBatch.getTimestamp() + " offset:" + microStreamBatch.getOffset());
- }
-
- @Override
- public void stop() {
- logger.info("consumer stopped");
- }
- };
- final StreamBuilder streamBuilder = StreamBuilder.newPeriodicalStreamBuilder("test", queues, consumer, nextPeriodStart, interval);
-
- streamBuilder.setStreamParser(new StreamParser() {
- @Override
- public ParsedStreamMessage parse(StreamMessage streamMessage) {
- return new ParsedStreamMessage(Collections.<String> emptyList(), streamMessage.getOffset(), Long.parseLong(new String(streamMessage.getRawData())), true);
- }
- });
-
- Future<?> future = Executors.newSingleThreadExecutor().submit(streamBuilder);
- long timeout = nextPeriodStart + interval;
- int messageCount = 0;
- int inPeriodMessageCount = 0;
- int expectedOffset = 0;
- logger.info("prepare to add StreamMessage");
- while (true) {
- long ts = System.currentTimeMillis();
- if (ts >= timeout + interval) {
- break;
- }
- if (ts >= nextPeriodStart && ts < timeout) {
- inPeriodMessageCount++;
- }
- for (BlockingQueue<StreamMessage> queue : queues) {
- queue.put(new StreamMessage(messageCount, String.valueOf(ts).getBytes()));
- }
- if (expectedOffset == 0 && ts >= timeout) {
- expectedOffset = messageCount - 1;
- }
- messageCount++;
- Thread.sleep(10);
- }
- logger.info("totally put " + messageCount + " StreamMessages");
- logger.info("totally in period " + inPeriodMessageCount + " StreamMessages");
-
- for (BlockingQueue<StreamMessage> queue : queues) {
- queue.put(StreamMessage.EOF);
- }
-
- future.get();
-
- for (BlockingQueue<StreamMessage> queue : queues) {
- queue.take();
- }
-
- final Map<Integer, Long> offsets = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getOffset("test", partitionIds);
- logger.info("offset:" + offsets);
- for (Long offset : offsets.values()) {
- assertEquals(expectedOffset, offset.longValue());
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
deleted file mode 100644
index 075a048..0000000
--- a/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SortedMultiset;
-import com.google.common.collect.TreeMultiset;
-
-/**
- * this is for generating fact table data for test_streaming_table (cube streaming)
- */
-public class StreamingTableDataGenerator {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingTableDataGenerator.class);
- private static final ObjectMapper mapper = new ObjectMapper();
-
- public static List<String> generate(int recordCount, long startTime, long endTime, String tableName) {
- Preconditions.checkArgument(startTime < endTime);
- Preconditions.checkArgument(recordCount > 0);
-
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
-
- SortedMultiset<Long> times = TreeMultiset.create();
- Random r = new Random();
- for (int i = 0; i < recordCount; i++) {
- long t = startTime + (long) ((endTime - startTime) * r.nextDouble());
- times.add(t);
- }
-
- List<String> ret = Lists.newArrayList();
- HashMap<String, String> kvs = Maps.newHashMap();
- for (long time : times) {
- kvs.clear();
- kvs.put("timestamp", String.valueOf(time));
- for (ColumnDesc columnDesc : tableDesc.getColumns()) {
- String lowerCaseColumnName = columnDesc.getName().toLowerCase();
- DataType dataType = columnDesc.getType();
- if (dataType.isDateTimeFamily()) {
- //TimedJsonStreamParser will derived minute_start,hour_start,day_start from timestamp
- continue;
- } else if (dataType.isStringFamily()) {
- char c = (char) ('A' + (int) (26 * r.nextDouble()));
- kvs.put(lowerCaseColumnName, String.valueOf(c));
- } else if (dataType.isIntegerFamily()) {
- int v = r.nextInt(10000);
- kvs.put(lowerCaseColumnName, String.valueOf(v));
- } else if (dataType.isNumberFamily()) {
- String v = String.format("%.4f", r.nextDouble() * 100);
- kvs.put(lowerCaseColumnName, v);
- }
- }
- try {
- ret.add(mapper.writeValueAsString(kvs));
- } catch (JsonProcessingException e) {
- logger.error("error!", e);
- }
- }
-
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
deleted file mode 100644
index 2a9893a..0000000
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.source.hive.cardinality.ColumnCardinalityMapper;
-import org.apache.kylin.source.hive.cardinality.ColumnCardinalityReducer;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author ysong1
- *
- */
-public class ColumnCardinalityReducerTest {
-
- public final static String strArr = "abc,tests,test,test,as,sts,test,tss,sets";
-
- ReduceDriver<IntWritable, BytesWritable, IntWritable, LongWritable> reduceDriver;
- String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
-
- @Before
- public void setUp() {
- ColumnCardinalityReducer reducer = new ColumnCardinalityReducer();
- reduceDriver = ReduceDriver.newReduceDriver(reducer);
- }
-
- private byte[] getBytes(String str) throws IOException {
- HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
- StringTokenizer tokenizer = new StringTokenizer(str, ColumnCardinalityMapper.DEFAULT_DELIM);
- int i = 0;
- while (tokenizer.hasMoreTokens()) {
- String temp = i + "_" + tokenizer.nextToken();
- i++;
- hllc.add(Bytes.toBytes(temp));
- }
- ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- buf.clear();
- hllc.writeRegisters(buf);
- buf.flip();
- return buf.array();
- }
-
- @Test
- public void testReducer() throws IOException {
- IntWritable key1 = new IntWritable(1);
- List<BytesWritable> values1 = new ArrayList<BytesWritable>();
- values1.add(new BytesWritable(getBytes(strArr)));
-
- IntWritable key2 = new IntWritable(2);
- List<BytesWritable> values2 = new ArrayList<BytesWritable>();
- values2.add(new BytesWritable(getBytes(strArr + " x")));
-
- IntWritable key3 = new IntWritable(3);
- List<BytesWritable> values3 = new ArrayList<BytesWritable>();
- values3.add(new BytesWritable(getBytes(strArr + " xx")));
-
- IntWritable key4 = new IntWritable(4);
- List<BytesWritable> values4 = new ArrayList<BytesWritable>();
- values4.add(new BytesWritable(getBytes(strArr + " xxx")));
-
- IntWritable key5 = new IntWritable(5);
- List<BytesWritable> values5 = new ArrayList<BytesWritable>();
- values5.add(new BytesWritable(getBytes(strArr + " xxxx")));
-
- reduceDriver.withInput(key1, values1);
- reduceDriver.withInput(key2, values2);
- reduceDriver.withInput(key3, values3);
- reduceDriver.withInput(key4, values4);
- reduceDriver.withInput(key5, values5);
-
- List<Pair<IntWritable, LongWritable>> result = reduceDriver.run();
-
- assertEquals(5, result.size());
-
- int outputKey1 = result.get(0).getFirst().get();
- LongWritable value1 = result.get(0).getSecond();
- assertTrue(outputKey1 == 1);
- assertTrue((10 == value1.get()) || (9 == value1.get()));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
deleted file mode 100644
index 8218d51..0000000
--- a/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.hive;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
-
- @Before
- public void setup() throws Exception {
- super.createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- super.cleanupTestMetadata();
- }
-
- @Test
- public void test() throws IOException {
- if (!useSandbox())
- return;
-
- KylinConfig config = getTestConfig();
- String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
- Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
-
- assertTrue(loaded.size() == toLoad.length);
- for (String str : toLoad)
- assertTrue(loaded.contains(str));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
deleted file mode 100644
index 57c0be3..0000000
--- a/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.hive;
-
-import java.io.IOException;
-
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * This test case need the hive runtime; Please run it with sandbox;
- * @author shaoshi
- *
- * It is in the exclude list of default profile in pom.xml
- */
-public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
-
- @Test
- public void test() throws IOException {
- HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact");
- int rowNumber = 0;
- while (reader.next()) {
- String[] row = reader.getRow();
- Assert.assertEquals(9, row.length);
- //System.out.println(ArrayUtils.toString(row));
- rowNumber++;
- }
-
- reader.close();
- Assert.assertEquals(10000, rowNumber);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
deleted file mode 100644
index 0df632a..0000000
--- a/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.hive;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableReader;
-import org.apache.kylin.source.SourceFactory;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author yangli9
- *
- */
-public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
-
- SnapshotManager snapshotMgr;
-
- @Before
- public void setup() throws Exception {
- createTestMetadata();
- snapshotMgr = SnapshotManager.getInstance(getTestConfig());
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-
- @Test
- public void basicTest() throws Exception {
- String tableName = "EDW.TEST_SITES";
- TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
- ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
- String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
-
- snapshotMgr.wipeoutCache();
-
- SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
-
- // compare hive & snapshot
- TableReader hiveReader = hiveTable.getReader();
- TableReader snapshotReader = snapshot.getReader();
-
- while (true) {
- boolean hiveNext = hiveReader.next();
- boolean snapshotNext = snapshotReader.next();
- assertEquals(hiveNext, snapshotNext);
-
- if (hiveNext == false)
- break;
-
- String[] hiveRow = hiveReader.getRow();
- String[] snapshotRow = snapshotReader.getRow();
- assertArrayEquals(hiveRow, snapshotRow);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/resources/data/flat_table/000000_0
----------------------------------------------------------------------
diff --git a/job/src/test/resources/data/flat_table/000000_0 b/job/src/test/resources/data/flat_table/000000_0
deleted file mode 100644
index 8b1b7cc..0000000
Binary files a/job/src/test/resources/data/flat_table/000000_0 and /dev/null differ