You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:48 UTC

[11/50] incubator-kylin git commit: fix

fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c3ff4f44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c3ff4f44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c3ff4f44

Branch: refs/heads/streaming-localdict
Commit: c3ff4f447f0884da9635c783ab5aa1d25243887b
Parents: 227edf7
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Mar 26 19:38:38 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Mar 26 19:38:38 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/IIStreamBuilderTest.java   | 80 ++++++++++++++++++++
 .../kylin/streaming/StreamingBootstrap.java     | 23 ++++--
 .../apache/kylin/streaming/StreamingCLI.java    |  3 +-
 .../invertedindex/IIStreamBuilderTest.java      | 41 ----------
 4 files changed, 98 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c3ff4f44/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
new file mode 100644
index 0000000..35a0fe9
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/IIStreamBuilderTest.java
@@ -0,0 +1,80 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.streaming.StreamingBootstrap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+
+/**
+ * Created by qianzhou on 3/6/15.
+ */
+public class IIStreamBuilderTest extends HBaseMetadataTestCase {
+
+    private KylinConfig kylinConfig;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+    }
+
+    @After
+    public void after() {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+        StreamingBootstrap.getInstance(kylinConfig).startStreaming("eagle", 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c3ff4f44/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
index 4b7c6b7..bd1ab42 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
@@ -39,7 +39,6 @@ import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
 import kafka.javaapi.PartitionMetadata;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.invertedindex.IIDescManager;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.model.IIDesc;
@@ -54,11 +53,19 @@ import java.util.concurrent.Future;
  */
 public class StreamingBootstrap {
 
-    private static KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-    private static StreamManager streamManager = StreamManager.getInstance(kylinConfig);
-    private static IIManager iiManager = IIManager.getInstance(kylinConfig);
-    private static IIDescManager iiDescManager = IIDescManager.getInstance(kylinConfig);
+    private KylinConfig kylinConfig;
+    private StreamManager streamManager;
+    private IIManager iiManager;
 
+    public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
+        return new StreamingBootstrap(kylinConfig);
+    }
+
+    private StreamingBootstrap(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+        this.streamManager = StreamManager.getInstance(kylinConfig);
+        this.iiManager = IIManager.getInstance(kylinConfig);
+    }
 
     private static Broker getLeadBroker(KafkaConfig kafkaConfig, int partitionId) {
         final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaConfig.getTopic(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
@@ -69,7 +76,7 @@ public class StreamingBootstrap {
         }
     }
 
-    public static void startStreaming(String streamingConf, int partitionId) throws Exception {
+    public void startStreaming(String streamingConf, int partitionId) throws Exception {
         final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streamingConf);
         Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streamingConf);
         final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
@@ -94,7 +101,9 @@ public class StreamingBootstrap {
         };
         final IIDesc desc = ii.getDescriptor();
         Executors.newSingleThreadExecutor().submit(consumer);
-        final Future<?> future = Executors.newSingleThreadExecutor().submit(new IIStreamBuilder(consumer.getStreamQueue(), ii.getSegments().get(0).getStorageLocationIdentifier(), desc, partitionId));
+        final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(), ii.getSegments().get(0).getStorageLocationIdentifier(), desc, partitionId);
+        task.setStreamParser(JsonStreamParser.instance);
+        final Future<?> future = Executors.newSingleThreadExecutor().submit(task);
         future.get();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c3ff4f44/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
index 70290f1..dac8ce0 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
@@ -35,6 +35,7 @@
 package org.apache.kylin.streaming;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +54,7 @@ public class StreamingCLI {
             }
             if (args[0].equals("start")) {
                 String kafkaConfName = args[1];
-                StreamingBootstrap.startStreaming(kafkaConfName, 0);
+                StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).startStreaming(kafkaConfName, 0);
             } else if (args.equals("stop")) {
 
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c3ff4f44/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilderTest.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilderTest.java
deleted file mode 100644
index 11b8868..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilderTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming.invertedindex;
-
-/**
- * Created by qianzhou on 3/6/15.
- */
-public class IIStreamBuilderTest {
-}