You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/19 02:27:52 UTC

[03/12] incubator-kylin git commit: KYLIN-1010 Decompose project job

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
deleted file mode 100644
index dc6d312..0000000
--- a/job/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.MicroStreamBatchConsumer;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StreamingManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(PeriodicalStreamBuilderTest.class);
-
-    @Before
-    public void setup() {
-        this.createTestMetadata();
-
-    }
-
-    @After
-    public void clear() {
-        this.cleanupTestMetadata();
-    }
-
-    private List<StreamMessage> prepareTestData(long start, long end, int count) {
-        double step = (double) (end - start) / (count - 1);
-        long ts = start;
-        int offset = 0;
-        ArrayList<StreamMessage> result = Lists.newArrayList();
-        for (int i = 0; i < count - 1; ++i) {
-            result.add(new StreamMessage(offset++, String.valueOf(ts).getBytes()));
-            ts += step;
-        }
-        result.add(new StreamMessage(offset++, String.valueOf(end).getBytes()));
-        assertEquals(count, result.size());
-        assertEquals(start + "", new String(result.get(0).getRawData()));
-        assertEquals(end + "", new String(result.get(count - 1).getRawData()));
-        return result;
-    }
-
-    @Test
-    public void test() throws ExecutionException, InterruptedException {
-
-        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
-        queues.add(new LinkedBlockingQueue<StreamMessage>());
-        queues.add(new LinkedBlockingQueue<StreamMessage>());
-
-        final long interval = 3000L;
-        final long nextPeriodStart = TimeUtil.getNextPeriodStart(System.currentTimeMillis(), interval);
-
-        final List<Integer> partitionIds = Lists.newArrayList();
-        for (int i = 0; i < queues.size(); i++) {
-            partitionIds.add(i);
-        }
-
-        final MicroStreamBatchConsumer consumer = new MicroStreamBatchConsumer() {
-            @Override
-            public void consume(MicroStreamBatch microStreamBatch) throws Exception {
-                logger.info("consuming batch:" + microStreamBatch.getPartitionId() + " count:" + microStreamBatch.size() + " timestamp:" + microStreamBatch.getTimestamp() + " offset:" + microStreamBatch.getOffset());
-            }
-
-            @Override
-            public void stop() {
-                logger.info("consumer stopped");
-            }
-        };
-        final StreamBuilder streamBuilder = StreamBuilder.newPeriodicalStreamBuilder("test", queues, consumer, nextPeriodStart, interval);
-
-        streamBuilder.setStreamParser(new StreamParser() {
-            @Override
-            public ParsedStreamMessage parse(StreamMessage streamMessage) {
-                return new ParsedStreamMessage(Collections.<String> emptyList(), streamMessage.getOffset(), Long.parseLong(new String(streamMessage.getRawData())), true);
-            }
-        });
-
-        Future<?> future = Executors.newSingleThreadExecutor().submit(streamBuilder);
-        long timeout = nextPeriodStart + interval;
-        int messageCount = 0;
-        int inPeriodMessageCount = 0;
-        int expectedOffset = 0;
-        logger.info("prepare to add StreamMessage");
-        while (true) {
-            long ts = System.currentTimeMillis();
-            if (ts >= timeout + interval) {
-                break;
-            }
-            if (ts >= nextPeriodStart && ts < timeout) {
-                inPeriodMessageCount++;
-            }
-            for (BlockingQueue<StreamMessage> queue : queues) {
-                queue.put(new StreamMessage(messageCount, String.valueOf(ts).getBytes()));
-            }
-            if (expectedOffset == 0 && ts >= timeout) {
-                expectedOffset = messageCount - 1;
-            }
-            messageCount++;
-            Thread.sleep(10);
-        }
-        logger.info("totally put " + messageCount + " StreamMessages");
-        logger.info("totally in period " + inPeriodMessageCount + " StreamMessages");
-
-        for (BlockingQueue<StreamMessage> queue : queues) {
-            queue.put(StreamMessage.EOF);
-        }
-
-        future.get();
-
-        for (BlockingQueue<StreamMessage> queue : queues) {
-            queue.take();
-        }
-
-        final Map<Integer, Long> offsets = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getOffset("test", partitionIds);
-        logger.info("offset:" + offsets);
-        for (Long offset : offsets.values()) {
-            assertEquals(expectedOffset, offset.longValue());
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java b/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
deleted file mode 100644
index 075a048..0000000
--- a/job/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SortedMultiset;
-import com.google.common.collect.TreeMultiset;
-
-/**
- * this is for generating fact table data for test_streaming_table (cube streaming)
- */
-public class StreamingTableDataGenerator {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingTableDataGenerator.class);
-    private static final ObjectMapper mapper = new ObjectMapper();
-
-    public static List<String> generate(int recordCount, long startTime, long endTime, String tableName) {
-        Preconditions.checkArgument(startTime < endTime);
-        Preconditions.checkArgument(recordCount > 0);
-
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
-
-        SortedMultiset<Long> times = TreeMultiset.create();
-        Random r = new Random();
-        for (int i = 0; i < recordCount; i++) {
-            long t = startTime + (long) ((endTime - startTime) * r.nextDouble());
-            times.add(t);
-        }
-
-        List<String> ret = Lists.newArrayList();
-        HashMap<String, String> kvs = Maps.newHashMap();
-        for (long time : times) {
-            kvs.clear();
-            kvs.put("timestamp", String.valueOf(time));
-            for (ColumnDesc columnDesc : tableDesc.getColumns()) {
-                String lowerCaseColumnName = columnDesc.getName().toLowerCase();
-                DataType dataType = columnDesc.getType();
-                if (dataType.isDateTimeFamily()) {
-                    //TimedJsonStreamParser will derived minute_start,hour_start,day_start from timestamp
-                    continue;
-                } else if (dataType.isStringFamily()) {
-                    char c = (char) ('A' + (int) (26 * r.nextDouble()));
-                    kvs.put(lowerCaseColumnName, String.valueOf(c));
-                } else if (dataType.isIntegerFamily()) {
-                    int v = r.nextInt(10000);
-                    kvs.put(lowerCaseColumnName, String.valueOf(v));
-                } else if (dataType.isNumberFamily()) {
-                    String v = String.format("%.4f", r.nextDouble() * 100);
-                    kvs.put(lowerCaseColumnName, v);
-                }
-            }
-            try {
-                ret.add(mapper.writeValueAsString(kvs));
-            } catch (JsonProcessingException e) {
-                logger.error("error!", e);
-            }
-        }
-
-        return ret;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
deleted file mode 100644
index 2a9893a..0000000
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.source.hive.cardinality.ColumnCardinalityMapper;
-import org.apache.kylin.source.hive.cardinality.ColumnCardinalityReducer;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author ysong1
- * 
- */
-public class ColumnCardinalityReducerTest {
-
-    public final static String strArr = "abc,tests,test,test,as,sts,test,tss,sets";
-
-    ReduceDriver<IntWritable, BytesWritable, IntWritable, LongWritable> reduceDriver;
-    String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
-
-    @Before
-    public void setUp() {
-        ColumnCardinalityReducer reducer = new ColumnCardinalityReducer();
-        reduceDriver = ReduceDriver.newReduceDriver(reducer);
-    }
-
-    private byte[] getBytes(String str) throws IOException {
-        HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
-        StringTokenizer tokenizer = new StringTokenizer(str, ColumnCardinalityMapper.DEFAULT_DELIM);
-        int i = 0;
-        while (tokenizer.hasMoreTokens()) {
-            String temp = i + "_" + tokenizer.nextToken();
-            i++;
-            hllc.add(Bytes.toBytes(temp));
-        }
-        ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-        buf.clear();
-        hllc.writeRegisters(buf);
-        buf.flip();
-        return buf.array();
-    }
-
-    @Test
-    public void testReducer() throws IOException {
-        IntWritable key1 = new IntWritable(1);
-        List<BytesWritable> values1 = new ArrayList<BytesWritable>();
-        values1.add(new BytesWritable(getBytes(strArr)));
-
-        IntWritable key2 = new IntWritable(2);
-        List<BytesWritable> values2 = new ArrayList<BytesWritable>();
-        values2.add(new BytesWritable(getBytes(strArr + " x")));
-
-        IntWritable key3 = new IntWritable(3);
-        List<BytesWritable> values3 = new ArrayList<BytesWritable>();
-        values3.add(new BytesWritable(getBytes(strArr + " xx")));
-
-        IntWritable key4 = new IntWritable(4);
-        List<BytesWritable> values4 = new ArrayList<BytesWritable>();
-        values4.add(new BytesWritable(getBytes(strArr + " xxx")));
-
-        IntWritable key5 = new IntWritable(5);
-        List<BytesWritable> values5 = new ArrayList<BytesWritable>();
-        values5.add(new BytesWritable(getBytes(strArr + " xxxx")));
-
-        reduceDriver.withInput(key1, values1);
-        reduceDriver.withInput(key2, values2);
-        reduceDriver.withInput(key3, values3);
-        reduceDriver.withInput(key4, values4);
-        reduceDriver.withInput(key5, values5);
-
-        List<Pair<IntWritable, LongWritable>> result = reduceDriver.run();
-
-        assertEquals(5, result.size());
-
-        int outputKey1 = result.get(0).getFirst().get();
-        LongWritable value1 = result.get(0).getSecond();
-        assertTrue(outputKey1 == 1);
-        assertTrue((10 == value1.get()) || (9 == value1.get()));
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
deleted file mode 100644
index 8218d51..0000000
--- a/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.hive;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
-
-    @Before
-    public void setup() throws Exception {
-        super.createTestMetadata();
-    }
-
-    @After
-    public void after() throws Exception {
-        super.cleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws IOException {
-        if (!useSandbox())
-            return;
-
-        KylinConfig config = getTestConfig();
-        String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
-        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
-
-        assertTrue(loaded.size() == toLoad.length);
-        for (String str : toLoad)
-            assertTrue(loaded.contains(str));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
deleted file mode 100644
index 57c0be3..0000000
--- a/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.hive;
-
-import java.io.IOException;
-
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * This test case need the hive runtime; Please run it with sandbox;
- * @author shaoshi
- *
- * It is in the exclude list of default profile in pom.xml
- */
-public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
-
-    @Test
-    public void test() throws IOException {
-        HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact");
-        int rowNumber = 0;
-        while (reader.next()) {
-            String[] row = reader.getRow();
-            Assert.assertEquals(9, row.length);
-            //System.out.println(ArrayUtils.toString(row));
-            rowNumber++;
-        }
-
-        reader.close();
-        Assert.assertEquals(10000, rowNumber);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
deleted file mode 100644
index 0df632a..0000000
--- a/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.hive;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableReader;
-import org.apache.kylin.source.SourceFactory;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author yangli9
- * 
- */
-public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
-
-    SnapshotManager snapshotMgr;
-
-    @Before
-    public void setup() throws Exception {
-        createTestMetadata();
-        snapshotMgr = SnapshotManager.getInstance(getTestConfig());
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-
-    @Test
-    public void basicTest() throws Exception {
-        String tableName = "EDW.TEST_SITES";
-        TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
-        ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
-        String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
-
-        snapshotMgr.wipeoutCache();
-
-        SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
-
-        // compare hive & snapshot
-        TableReader hiveReader = hiveTable.getReader();
-        TableReader snapshotReader = snapshot.getReader();
-
-        while (true) {
-            boolean hiveNext = hiveReader.next();
-            boolean snapshotNext = snapshotReader.next();
-            assertEquals(hiveNext, snapshotNext);
-
-            if (hiveNext == false)
-                break;
-
-            String[] hiveRow = hiveReader.getRow();
-            String[] snapshotRow = snapshotReader.getRow();
-            assertArrayEquals(hiveRow, snapshotRow);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/resources/data/flat_table/000000_0
----------------------------------------------------------------------
diff --git a/job/src/test/resources/data/flat_table/000000_0 b/job/src/test/resources/data/flat_table/000000_0
deleted file mode 100644
index 8b1b7cc..0000000
Binary files a/job/src/test/resources/data/flat_table/000000_0 and /dev/null differ