You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/11/08 09:27:25 UTC
[1/3] incubator-kylin git commit: KYLIN-1023 kylin streaming log
start end offset for each partition for data verification
Repository: incubator-kylin
Updated Branches:
refs/heads/devstreaming [created] bdada03f7
KYLIN-1023 kylin streaming log start end offset for each partition for data verification
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/54cab0ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/54cab0ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/54cab0ab
Branch: refs/heads/devstreaming
Commit: 54cab0ab4a5aed6e5c963c2465876723ba91bb9d
Parents: fe9e02c
Author: honma <ho...@ebay.com>
Authored: Wed Sep 16 10:53:05 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Sep 16 10:53:05 2015 +0800
----------------------------------------------------------------------
.../kylin/engine/streaming/StreamingCLI.java | 2 -
.../org/apache/kylin/job/tools/KafkaVerify.java | 101 +++++++++++++++++++
.../kylin/streaming/MicroStreamBatch.java | 4 +
.../kylin/streaming/OneOffStreamBuilder.java | 2 +-
.../apache/kylin/streaming/StreamFetcher.java | 19 ++--
.../apache/kylin/streaming/StreamingConfig.java | 2 +-
6 files changed, 119 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
index 277ee69..8bf52c1 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
@@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-/**
- */
public class StreamingCLI {
private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java b/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java
new file mode 100644
index 0000000..ee64e66
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+
+/**
+ * only for verify kylin streaming's correctness by comparing to data in original kafka topic
+ */
+public class KafkaVerify {
+
+ public static void main(String[] args) throws IOException {
+
+ System.out.println("start");
+
+ ObjectMapper mapper = new ObjectMapper();
+ JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+ long start = Long.valueOf(args[0]);
+ long end = Long.valueOf(args[1]);
+ long interval = Long.valueOf(args[2]);
+ int bucket = (int) ((end - start + interval - 1) / interval);
+
+ long qtySum[] = new long[bucket];
+ long qtyTotal = 0;
+ long counts[] = new long[bucket];
+ long countTotal = 0;
+ long processed = 0;
+ long minOffset = -1;
+ long maxOffset = -1;
+
+ try (BufferedReader br = new BufferedReader(new FileReader(new File(args[3])))) {
+ String s;
+ while ((s = br.readLine()) != null) {
+ // process the line.
+ if (++processed % 10000 == 1) {
+ System.out.println("processing " + processed);
+ }
+
+ Map<String, String> root = mapper.readValue(s, mapType);
+ String tsStr = root.get("sys_ts");
+
+ if (StringUtils.isEmpty(tsStr)) {
+ continue;
+ }
+ long ts = Long.valueOf(tsStr);
+ if (ts < start || ts >= end) {
+ continue;
+ }
+
+ if (minOffset == -1) {
+ minOffset = processed - 1;
+ }
+ maxOffset = processed - 1;
+
+ long qty = Long.valueOf(root.get("qty"));
+ int index = (int) ((ts - start) / interval);
+ qtySum[index] += qty;
+ qtyTotal += qty;
+ counts[index]++;
+ countTotal++;
+ }
+ }
+
+ System.out.println("qty sum is " + Arrays.toString(qtySum));
+ System.out.println("qty total is " + qtyTotal);
+ System.out.println("count is " + Arrays.toString(counts));
+ System.out.println("count total is " + countTotal);
+ System.out.println("first processed is " + minOffset);
+ System.out.println("last processed is " + maxOffset);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
index f4d9e05..27f817e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -62,6 +62,10 @@ public final class MicroStreamBatch {
return this.rawMessageCount;
}
+ public final int getFilteredMessageCount() {
+ return this.streams.size();
+ }
+
public final void add(ParsedStreamMessage parsedStreamMessage) {
if (offset.getFirst() > parsedStreamMessage.getOffset()) {
offset.setFirst(parsedStreamMessage.getOffset());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
index 927873a..ae0f70f 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
@@ -45,7 +45,7 @@ public class OneOffStreamBuilder implements Runnable {
final List<Future<MicroStreamBatch>> futures = Lists.newLinkedList();
int partitionId = 0;
for (BlockingQueue<StreamMessage> queue : queues) {
- futures.add(executorService.submit(new StreamFetcher(partitionId, queue, countDownLatch, batchCondition, streamParser)));
+ futures.add(executorService.submit(new StreamFetcher(partitionId++, queue, countDownLatch, batchCondition, streamParser)));
}
countDownLatch.await();
List<MicroStreamBatch> batches = Lists.newLinkedList();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
index 85d09be..f429a49 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
@@ -7,6 +7,8 @@ import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
*/
public class StreamFetcher implements Callable<MicroStreamBatch> {
@@ -26,9 +28,6 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
this.streamParser = streamParser;
}
- private void clearCounter() {
- }
-
private StreamMessage peek(BlockingQueue<StreamMessage> queue, long timeout) {
long t = System.currentTimeMillis();
while (true) {
@@ -57,7 +56,6 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
while (true) {
if (microStreamBatch == null) {
microStreamBatch = new MicroStreamBatch(partitionId);
- clearCounter();
}
StreamMessage streamMessage = peek(streamMessageQueue, 60000);
if (streamMessage == null) {
@@ -83,21 +81,28 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
} else if (result == BatchCondition.Result.LAST_ACCEPT_FOR_BATCH) {
streamMessageQueue.take();
microStreamBatch.add(parsedStreamMessage);
- return microStreamBatch;
+ break;
} else if (result == BatchCondition.Result.DISCARD) {
streamMessageQueue.take();
} else if (result == BatchCondition.Result.REJECT) {
- return microStreamBatch;
+ logger.info("Partition :" + partitionId + " rejecting message at " + parsedStreamMessage.getOffset());
+ break;
}
} else {
streamMessageQueue.take();
}
}
+
+ Preconditions.checkArgument(microStreamBatch != null, "microStreamBatch is null!");
+ logger.info(String.format("Partition %d contributing %d filtered messages out from %d raw messages"//
+ , partitionId, microStreamBatch.getFilteredMessageCount(), microStreamBatch.getRawMessageCount()));
+ return microStreamBatch;
+
} catch (Exception e) {
logger.error("build stream error, stop building", e);
throw new RuntimeException("build stream error, stop building", e);
} finally {
- logger.info("one partition sign off");
+ logger.info("partition {} sign off", partitionId);
countDownLatch.countDown();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
index 320768b..c2d5361 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
@@ -84,7 +84,7 @@ public class StreamingConfig extends RootPersistentEntity {
@JsonProperty("parserName")
private String parserName;
- //"configA=1;configB=2"
+ //"tsColName=timestamp;x=y"
@JsonProperty("parserProperties")
private String parserProperties;
[2/3] incubator-kylin git commit: fix migration cli
Posted by ma...@apache.org.
fix migration cli
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b66c2580
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b66c2580
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b66c2580
Branch: refs/heads/devstreaming
Commit: b66c25803a2f976cca067148278dbe7d7b0d79ef
Parents: 54cab0a
Author: honma <ho...@ebay.com>
Authored: Wed Sep 16 14:37:42 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Sep 16 16:44:03 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfig.java | 2 +-
.../kylin/job/tools/CubeMigrationCLI.java | 19 +++-
.../storage/hbase/steps/HBaseMROutput2.java | 1 +
webapp/app/js/model/cubeDescModel.js | 104 ++++++++++---------
4 files changed, 69 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b66c2580/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index db213f7..43b8c4d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -502,7 +502,7 @@ public class KylinConfig implements Serializable {
}
public String getHbaseDefaultCompressionCodec() {
- return getOptional(HTABLE_DEFAULT_COMPRESSION_CODEC);
+ return getOptional(HTABLE_DEFAULT_COMPRESSION_CODEC,"");
}
public boolean isHiveKeepFlatTable() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b66c2580/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
index 89d55f6..c68196a 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,6 +46,7 @@ import org.apache.kylin.dict.lookup.SnapshotManager;
import org.apache.kylin.dict.lookup.SnapshotTable;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IEngineAware;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
@@ -87,7 +89,6 @@ public class CubeMigrationCLI {
private static void usage() {
System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName overwriteIfExists realExecute");
System.out.println(" srcKylinConfigUri: The KylinConfig of the cube’s source \n" + "dstKylinConfigUri: The KylinConfig of the cube’s new home \n" + "cubeName: the name of cube to be migrated. \n" + "projectName: The target project in the target environment.(Make sure it exist) \n" + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" + "realExecute: if false, just print the operations to take, if true, do the real migration. \n");
-
}
public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
@@ -132,7 +133,6 @@ public class CubeMigrationCLI {
}
public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
-
moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, overwriteIfExists, realExecute);
}
@@ -143,8 +143,8 @@ public class CubeMigrationCLI {
logger.info("src metadata url is " + srcMetadataUrl);
logger.info("dst metadata url is " + dstMetadataUrl);
- int srcIndex = srcMetadataUrl.toLowerCase().indexOf("hbase:");
- int dstIndex = dstMetadataUrl.toLowerCase().indexOf("hbase:");
+ int srcIndex = srcMetadataUrl.toLowerCase().indexOf("hbase");
+ int dstIndex = dstMetadataUrl.toLowerCase().indexOf("hbase");
if (srcIndex < 0 || dstIndex < 0)
throw new IllegalStateException("Both metadata urls should be hbase metadata url");
@@ -162,6 +162,11 @@ public class CubeMigrationCLI {
for (CubeSegment segment : cube.getSegments()) {
String jobUuid = segment.getLastBuildJobID();
+
+ if (StringUtils.isEmpty(jobUuid)) {
+ //segments build from streaming does not have hdfs working dir
+ continue;
+ }
String src = JobInstance.getJobWorkingDir(jobUuid, srcConfig.getHdfsWorkingDirectory());
String tgt = JobInstance.getJobWorkingDir(jobUuid, dstConfig.getHdfsWorkingDirectory());
@@ -216,7 +221,11 @@ public class CubeMigrationCLI {
for (CubeSegment segment : cube.getSegments()) {
dictAndSnapshot.addAll(segment.getSnapshotPaths());
dictAndSnapshot.addAll(segment.getDictionaryPaths());
- metaResource.add(segment.getStatisticsResourcePath());
+
+ if (cube.getDescriptor().getEngineType() == IEngineAware.ID_MR_V2) {
+ //only V2 has this
+ metaResource.add(segment.getStatisticsResourcePath());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b66c2580/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
index 1e414be..79ef403 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
@@ -170,6 +170,7 @@ public class HBaseMROutput2 implements IMROutput2 {
scans.add(scan);
}
TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+ TableMapReduceUtil.initCredentials(job);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b66c2580/webapp/app/js/model/cubeDescModel.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/cubeDescModel.js b/webapp/app/js/model/cubeDescModel.js
index 86c8444..3376bfc 100644
--- a/webapp/app/js/model/cubeDescModel.js
+++ b/webapp/app/js/model/cubeDescModel.js
@@ -14,62 +14,64 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
-KylinApp.service('CubeDescModel',function(){
+KylinApp.service('CubeDescModel', function () {
- this.cubeMetaFrame = {};
+ this.cubeMetaFrame = {};
- //
- this.createNew = function () {
- var cubeMeta = {
- "name": "",
- "model_name": "",
- "description": "",
- "dimensions": [],
- "measures": [
- { "id": 1,
- "name": "_COUNT_",
- "function": {
- "expression": "COUNT",
- "returntype": "bigint",
- "parameter": {
- "type": "constant",
- "value": "1"
- }
- }
- }
- ],
- "rowkey": {
- "rowkey_columns": [],
- "aggregation_groups": []
- },
- "notify_list": [],
- "hbase_mapping": {
- "column_family": []
- },
- "retention_range":"0",
- "auto_merge_time_ranges":[604800000,2419200000]
- };
-
- return cubeMeta;
- };
+ //
+ this.createNew = function () {
+ var cubeMeta = {
+ "name": "",
+ "model_name": "",
+ "description": "",
+ "dimensions": [],
+ "measures": [
+ {
+ "id": 1,
+ "name": "_COUNT_",
+ "function": {
+ "expression": "COUNT",
+ "returntype": "bigint",
+ "parameter": {
+ "type": "constant",
+ "value": "1"
+ }
+ }
+ }
+ ],
+ "rowkey": {
+ "rowkey_columns": [],
+ "aggregation_groups": []
+ },
+ "notify_list": [],
+ "hbase_mapping": {
+ "column_family": []
+ },
+ "retention_range": "0",
+ "auto_merge_time_ranges": [604800000, 2419200000],
+ "engine_type": 2
+ };
- this.createMeasure = function (){
- var measure = {
- "id": "",
- "name": "",
- "function": {
- "expression": "",
- "returntype": "",
- "parameter": {
- "type": "",
- "value": ""
- }
- }
- };
+ return cubeMeta;
+ };
- return measure;
+ this.createMeasure = function () {
+ var measure = {
+ "id": "",
+ "name": "",
+ "function": {
+ "expression": "",
+ "returntype": "",
+ "parameter": {
+ "type": "",
+ "value": ""
}
+ }
+ };
+
+ return measure;
+ }
})
[3/3] incubator-kylin git commit: force streaming proecess to use
correct log4j properties
Posted by ma...@apache.org.
force streaming proecess to use correct log4j properties
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/bdada03f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/bdada03f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/bdada03f
Branch: refs/heads/devstreaming
Commit: bdada03f70a103dcd273356c3eb1cebefc43f4c9
Parents: b66c258
Author: honma <ho...@ebay.com>
Authored: Thu Oct 15 16:59:31 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Oct 15 16:59:31 2015 +0800
----------------------------------------------------------------------
bin/kylin.sh | 9 +--------
.../java/org/apache/kylin/job/streaming/StreamingCLI.java | 7 +++++++
2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bdada03f/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/bin/kylin.sh b/bin/kylin.sh
index 2e9cf24..cecee27 100644
--- a/bin/kylin.sh
+++ b/bin/kylin.sh
@@ -91,11 +91,6 @@ then
fi
if [ $2 == "start" ]
then
- useSandbox=`sh ${dir}/get-properties.sh kylin.sandbox`
- spring_profile="default"
- if [ "$useSandbox" = "true" ]
- then spring_profile="sandbox"
- fi
#retrive $hive_dependency and $hbase_dependency
source ${dir}/find-hive-dependency.sh
@@ -110,11 +105,9 @@ then
# KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
hbase ${KYLIN_EXTRA_START_OPTS} \
- -Djava.util.logging.manager=org.apache.juli.ClassLoaderLogManager \
- -Dorg.apache.catalina.connector.CoyoteAdapter.ALLOW_BACKSLASH=true \
-Dkylin.hive.dependency=${hive_dependency} \
-Dkylin.hbase.dependency=${hbase_dependency} \
- -Dspring.profiles.active=${spring_profile} \
+ -Dlog4j.configuration=kylinlog4j.properties \
org.apache.kylin.job.streaming.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
echo "streaming started name: $3 id: $4"
exit 0
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bdada03f/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index 8346ec0..a8bbb8a 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -51,6 +51,12 @@ public class StreamingCLI {
public static void main(String[] args) {
try {
+ System.out.println("hi");
+ logger.info("In StreamingCLI");
+ for (String arg : args) {
+ logger.info(arg);
+ }
+
AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
Preconditions.checkArgument(args[0].equals("streaming"));
@@ -91,6 +97,7 @@ public class StreamingCLI {
logger.info("streaming process stop, exit with 0");
System.exit(0);
} catch (Exception e) {
+ e.printStackTrace();
printArgsError(args);
logger.error("error start streaming", e);
System.exit(-1);