You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/19 14:16:21 UTC

[1/2] kylin git commit: KYLIN-2032 do not need dict for partition col if it is not in dimension

Repository: kylin
Updated Branches:
  refs/heads/master c9dcb07c7 -> a8a1b6e03


KYLIN-2032 do not need dict for partition col if it is not in dimension

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a8a1b6e0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a8a1b6e0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a8a1b6e0

Branch: refs/heads/master
Commit: a8a1b6e037767b75a3184f4b5db6f384f622a89e
Parents: a08dd2e
Author: shaofengshi <sh...@apache.org>
Authored: Mon Sep 19 22:15:50 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Sep 19 22:16:06 2016 +0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/kylin/cube/CubeManager.java      | 7 -------
 1 file changed, 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a8a1b6e0/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 57b9510..2660c9f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -943,13 +943,6 @@ public class CubeManager implements IRealizationProvider {
             }
         }
 
-        // add partition column in all case
-        if (cubeDesc.getModel().getPartitionDesc() != null) {
-            TblColRef partitionCol = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
-            if (factDictCols.contains(partitionCol) == false) {
-                factDictCols.add(partitionCol);
-            }
-        }
         return factDictCols;
     }
 }


[2/2] kylin git commit: refactor BuildCubeWithStream

Posted by sh...@apache.org.
refactor BuildCubeWithStream

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a08dd2e0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a08dd2e0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a08dd2e0

Branch: refs/heads/master
Commit: a08dd2e03900b321617647d1dbf1c4ee8b4b18c2
Parents: c9dcb07
Author: shaofengshi <sh...@apache.org>
Authored: Mon Sep 19 20:18:35 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Sep 19 22:16:06 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   |   7 +-
 .../kylin/provision/BuildCubeWithStream.java    |  10 +-
 .../kylin/provision/BuildCubeWithStream2.java   | 145 +------------------
 3 files changed, 12 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a08dd2e0/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 9b282e3..9e9df05 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -187,6 +187,7 @@ public class DeployUtil {
         File tmpFile = File.createTempFile(factTableName, "csv");
         FileOutputStream out = new FileOutputStream(tmpFile);
 
+        InputStream tempIn = null;
         try {
             if (store.exists(factTablePath)) {
                 InputStream oldContent = store.getResource(factTablePath).inputStream;
@@ -194,13 +195,15 @@ public class DeployUtil {
             }
             IOUtils.copy(in, out);
             IOUtils.closeQuietly(in);
+            IOUtils.closeQuietly(out);
 
             store.deleteResource(factTablePath);
-            in = new FileInputStream(tmpFile);
-            store.putResource(factTablePath, in, System.currentTimeMillis());
+            tempIn = new FileInputStream(tmpFile);
+            store.putResource(factTablePath, tempIn, System.currentTimeMillis());
         } finally {
             IOUtils.closeQuietly(out);
             IOUtils.closeQuietly(in);
+            IOUtils.closeQuietly(tempIn);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a08dd2e0/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 6e5313f..bfe1d0a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -62,10 +62,10 @@ public class BuildCubeWithStream {
 
     private static final Logger logger = LoggerFactory.getLogger(org.apache.kylin.provision.BuildCubeWithStream.class);
 
-    private CubeManager cubeManager;
+    protected CubeManager cubeManager;
     private DefaultScheduler scheduler;
     protected ExecutableManager jobService;
-    private static final String cubeName = "test_streaming_table_cube";
+    static final String cubeName = "test_streaming_table_cube";
 
     private KafkaConfig kafkaConfig;
     private MockKafka kafkaServer;
@@ -114,13 +114,13 @@ public class BuildCubeWithStream {
         Assert.assertEquals(topicName, topicMetadata.topic());
     }
 
-    private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException {
+    protected void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException {
         Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig);
         DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader);
         logger.info("Test data inserted into Kafka");
     }
 
-    private void clearSegment(String cubeName) throws Exception {
+    protected void clearSegment(String cubeName) throws Exception {
         CubeInstance cube = cubeManager.getCube(cubeName);
         // remove all existing segments
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -187,7 +187,7 @@ public class BuildCubeWithStream {
         return job.getId();
     }
 
-    private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
+    protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
         CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
         DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a08dd2e0/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
index 2812446..7959701 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
@@ -18,13 +18,11 @@
 
 package org.apache.kylin.provision;
 
-import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.List;
 import java.util.Random;
 import java.util.TimeZone;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -32,32 +30,9 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.job.streaming.Kafka10DataLoader;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,79 +42,12 @@ import static java.lang.Thread.sleep;
 /**
  *  for streaming cubing case "test_streaming_table", using multiple threads to build it concurrently.
  */
-public class BuildCubeWithStream2 {
+public class BuildCubeWithStream2 extends BuildCubeWithStream {
 
     private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream2.class);
-
-    private CubeManager cubeManager;
-    private DefaultScheduler scheduler;
-    protected ExecutableManager jobService;
-    private static final String cubeName = "test_streaming_table_cube";
-
-    private KafkaConfig kafkaConfig;
-    private MockKafka kafkaServer;
     private static boolean generateData = true;
 
-    public void before() throws Exception {
-        deployEnv();
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.createInstance();
-        scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
-        if (!scheduler.hasStarted()) {
-            throw new RuntimeException("scheduler has not been started");
-        }
-        cubeManager = CubeManager.getInstance(kylinConfig);
-
-        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
-        final String factTable = cubeInstance.getFactTable();
-
-        final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
-        final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable);
-        kafkaConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName());
-
-        String topicName = UUID.randomUUID().toString();
-        String localIp = NetworkUtils.getLocalIp();
-        BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0);
-        brokerConfig.setHost(localIp);
-        kafkaConfig.setTopic(topicName);
-        KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig);
-
-        startEmbeddedKafka(topicName, brokerConfig);
-    }
-
-    private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
-        //Start mock Kakfa
-        String zkConnectionStr = "sandbox:2181";
-        ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
-        // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
-        kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
-        kafkaServer.start();
-
-        kafkaServer.createTopic(topicName, 3, 1);
-        kafkaServer.waitTopicUntilReady(topicName);
-
-        MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName);
-        Assert.assertEquals(topicName, topicMetadata.topic());
-    }
-
-    private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException {
-        if (numberOfRecords <= 0)
-            return;
-        Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig);
-        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader);
-        logger.info("Test data inserted into Kafka");
-    }
-
-    private void clearSegment(String cubeName) throws Exception {
-        CubeInstance cube = cubeManager.getCube(cubeName);
-        // remove all existing segments
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
-        cubeManager.updateCube(cubeBuilder);
-    }
-
+    @Override
     public void build() throws Exception {
         clearSegment(cubeName);
         SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
@@ -204,55 +112,6 @@ public class BuildCubeWithStream2 {
 
     }
 
-
-    private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
-        CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
-        DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
-        jobService.addJob(job);
-        waitForJob(job.getId());
-        return job.getStatus();
-    }
-
-    protected void deployEnv() throws IOException {
-        DeployUtil.overrideJobJarLocations();
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-    }
-
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
-        if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
-            throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
-        }
-        HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
-    }
-
-    public static void afterClass() throws Exception {
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
-    }
-
-    public void after() {
-        kafkaServer.stop();
-        DefaultScheduler.destroyInstance();
-    }
-
-    protected void waitForJob(String jobId) {
-        while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) {
-                break;
-            } else {
-                try {
-                    sleep(5000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
     public static void main(String[] args) throws Exception {
         try {
             beforeClass();