You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/04 08:31:49 UTC
[04/30] kylin git commit: KYLIN-1762 fix query test error
KYLIN-1762 fix query test error
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ab5563a8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ab5563a8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ab5563a8
Branch: refs/heads/master-hbase1.x
Commit: ab5563a8ec060fba48ec8f43244bed6f887b0e83
Parents: be18158
Author: shaofengshi <sh...@apache.org>
Authored: Sun Sep 25 21:41:37 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 27 10:17:40 2016 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/Kafka10DataLoader.java | 2 +-
.../cube/model/CubeJoinedFlatTableDesc.java | 2 +-
.../mr/steps/FactDistinctColumnPartitioner.java | 3 ++
.../engine/mr/steps/FactDistinctColumnsJob.java | 2 +-
.../mr/steps/FactDistinctColumnsReducer.java | 38 +++++++++-----
.../mr/steps/FactDistinctHiveColumnsMapper.java | 49 ++++++++++++++++--
.../kafka/DEFAULT.STREAMING_TABLE.json | 1 +
.../kylin/provision/BuildCubeWithStream.java | 52 ++++++++++++--------
.../kylin/provision/BuildCubeWithStream2.java | 4 +-
.../apache/kylin/query/ITKylinQueryTest.java | 3 ++
.../org/apache/kylin/query/KylinTestBase.java | 2 +-
.../apache/kylin/source/kafka/KafkaMRInput.java | 2 +-
.../source/kafka/TimedJsonStreamParser.java | 7 +--
13 files changed, 117 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
index 2b299cc..8c548be 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
@@ -65,7 +65,7 @@ public class Kafka10DataLoader extends StreamDataLoader {
props.put("retry.backoff.ms", "1000");
KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props);
- for (int i = 0; i < messages.size(); ++i) {
+ for (int i = 0; i < messages.size(); i++) {
ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
producer.send(keyedMessage);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 6ca89c8..5cd4f1d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -143,7 +143,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
public int getColumnIndex(TblColRef colRef) {
Integer index = columnIndexMap.get(colRef);
if (index == null)
- throw new IllegalArgumentException("Column " + colRef.toString() + " wasn't found on flat table.");
+ return -1;
return index.intValue();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index a631cf4..6973c4b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -34,6 +34,9 @@ public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> {
if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) {
// the last reducer is for merging hll
return numReduceTasks - 1;
+ } else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) {
+ // the last reducer is for merging hll
+ return numReduceTasks - 2;
} else {
int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1);
return colIndex;
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index a6c4d30..a9cc17f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -101,7 +101,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
System.out.println("Found segment " + segment);
}
setupMapper(cube.getSegmentById(segmentID));
- setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 1 : columnsNeedDict.size());
+ setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size());
attachKylinPropsAndMetadata(cube, job.getConfiguration());
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 0c13df7..2889ba8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -65,7 +65,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
private List<ByteArray> colValues;
private TblColRef col = null;
private boolean isStatistics = false;
- private boolean outputTouched = false;
+ private boolean isPartitionCol = false;
private KylinConfig cubeConfig;
protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
@@ -92,25 +92,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
baseCuboidRowCountInMappers = Lists.newArrayList();
cuboidHLLMap = Maps.newHashMap();
samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+ } else if (collectStatistics && (taskId == numberOfTasks - 2)) {
+ // partition col
+ isStatistics = false;
+ isPartitionCol = true;
+ col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
+ colValues = Lists.newLinkedList();
} else {
// col
isStatistics = false;
+ isPartitionCol = false;
col = columnList.get(taskId);
- colValues = Lists.newArrayList();
+ colValues = Lists.newLinkedList();
}
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- if (isStatistics == false) {
- colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
- if (colValues.size() == 1000000) { //spill every 1 million
- logger.info("spill values to disk...");
- outputDistinctValues(col, colValues, context);
- colValues.clear();
- }
- } else {
+ if (isStatistics == true) {
// for hll
long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
for (Text value : values) {
@@ -130,6 +130,21 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
cuboidHLLMap.put(cuboidId, hll);
}
}
+ } else if (isPartitionCol == true) {
+ // for partition col min/max value
+ ByteArray value = new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1));
+ if (colValues.size() > 1) {
+ colValues.set(1, value);
+ } else {
+ colValues.add(value);
+ }
+ } else {
+ colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
+ if (colValues.size() == 1000000) { //spill every 1 million
+ logger.info("spill values to disk...");
+ outputDistinctValues(col, colValues, context);
+ colValues.clear();
+ }
}
}
@@ -156,7 +171,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
}
} finally {
IOUtils.closeQuietly(out);
- outputTouched = true;
}
}
@@ -164,7 +178,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
protected void cleanup(Context context) throws IOException, InterruptedException {
if (isStatistics == false) {
- if (!outputTouched || colValues.size() > 0) {
+ if (colValues.size() > 0) {
outputDistinctValues(col, colValues, context);
colValues.clear();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 5e278f8..86ef487 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
+import org.apache.kylin.metadata.model.TblColRef;
/**
*/
@@ -52,8 +53,12 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
private ByteArray[] row_hashcodes = null;
private ByteBuffer keyBuffer;
private static final Text EMPTY_TEXT = new Text();
+ public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
public static final byte MARK_FOR_HLL = (byte) 0xFF;
+ private int partitionColumnIndex = -1;
+ private boolean needFetchPartitionCol = true;
+
@Override
protected void setup(Context context) throws IOException {
super.setup(context);
@@ -81,6 +86,26 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
for (int i = 0; i < nRowKey; i++) {
row_hashcodes[i] = new ByteArray();
}
+
+ TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
+ if (partitionColRef != null) {
+ partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
+ }
+
+ // check whether need fetch the partition col values
+ if (partitionColumnIndex < 0) {
+ // if partition col not on cube, no need
+ needFetchPartitionCol = false;
+ } else {
+ for (int x : dictionaryColumnIndex) {
+ if (x == partitionColumnIndex) {
+ // if partition col already build dict, no need
+ needFetchPartitionCol = false;
+ break;
+ }
+ }
+ }
+
}
}
@@ -108,24 +133,38 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
@Override
public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
String[] row = flatTableInputFormat.parseMapperInput(record);
+
+ keyBuffer.clear();
try {
for (int i = 0; i < factDictCols.size(); i++) {
String fieldValue = row[dictionaryColumnIndex[i]];
if (fieldValue == null)
continue;
-
- keyBuffer.clear();
+ int offset = keyBuffer.position();
keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough
keyBuffer.put(Bytes.toBytes(fieldValue));
- outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
+ outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset);
context.write(outputKey, EMPTY_TEXT);
}
} catch (Exception ex) {
handleErrorRecord(row, ex);
}
- if (collectStatistics && rowCount < samplingPercentage) {
- putRowKeyToHLL(row);
+ if (collectStatistics) {
+ if (rowCount < samplingPercentage) {
+ putRowKeyToHLL(row);
+ }
+
+ if (needFetchPartitionCol == true) {
+ String fieldValue = row[partitionColumnIndex];
+ if (fieldValue != null) {
+ int offset = keyBuffer.position();
+ keyBuffer.put(MARK_FOR_PARTITION_COL);
+ keyBuffer.put(Bytes.toBytes(fieldValue));
+ outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset);
+ context.write(outputKey, EMPTY_TEXT);
+ }
+ }
}
if (rowCount++ == 100)
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
index 6a64cce..e3ac2d6 100644
--- a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
@@ -6,6 +6,7 @@
"timeout": 60000,
"bufferSize": 65536,
"parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
+ "parserProperties": "tsColName=timestamp",
"last_modified": 0,
"clusters": [
{
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index bfe1d0a..dfcedfb 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -69,10 +69,19 @@ public class BuildCubeWithStream {
private KafkaConfig kafkaConfig;
private MockKafka kafkaServer;
+ protected static boolean fastBuildMode = false;
public void before() throws Exception {
deployEnv();
+ String fastModeStr = System.getProperty("fastBuildMode");
+ if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) {
+ fastBuildMode = true;
+ logger.info("Will use fast build mode");
+ } else {
+ logger.info("Will not use fast build mode");
+ }
+
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
jobService = ExecutableManager.getInstance(kylinConfig);
scheduler = DefaultScheduler.createInstance();
@@ -139,29 +148,32 @@ public class BuildCubeWithStream {
generateStreamData(date1, date2, numberOfRecrods1);
ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE);
Assert.assertTrue(result == ExecutableState.SUCCEED);
- long date3 = f.parse("2013-04-01").getTime();
- int numberOfRecords2 = 5000;
- generateStreamData(date2, date3, numberOfRecords2);
- result = buildSegment(cubeName, 0, Long.MAX_VALUE);
- Assert.assertTrue(result == ExecutableState.SUCCEED);
- //empty build
- result = buildSegment(cubeName, 0, Long.MAX_VALUE);
- Assert.assertTrue(result == ExecutableState.DISCARDED);
+ if (fastBuildMode == false) {
+ long date3 = f.parse("2013-04-01").getTime();
+ int numberOfRecords2 = 5000;
+ generateStreamData(date2, date3, numberOfRecords2);
+ result = buildSegment(cubeName, 0, Long.MAX_VALUE);
+ Assert.assertTrue(result == ExecutableState.SUCCEED);
- //merge
- result = mergeSegment(cubeName, 0, 15000);
- Assert.assertTrue(result == ExecutableState.SUCCEED);
+ //empty build
+ result = buildSegment(cubeName, 0, Long.MAX_VALUE);
+ Assert.assertTrue(result == ExecutableState.DISCARDED);
+
+ //merge
+ result = mergeSegment(cubeName, 0, 15000);
+ Assert.assertTrue(result == ExecutableState.SUCCEED);
- List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments();
- Assert.assertTrue(segments.size() == 1);
+ List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments();
+ Assert.assertTrue(segments.size() == 1);
- CubeSegment toRefreshSeg = segments.get(0);
- HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo();
+ CubeSegment toRefreshSeg = segments.get(0);
+ HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo();
- refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap);
- segments = cubeManager.getCube(cubeName).getSegments();
- Assert.assertTrue(segments.size() == 1);
+ refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap);
+ segments = cubeManager.getCube(cubeName).getSegments();
+ Assert.assertTrue(segments.size() == 1);
+ }
}
@@ -197,8 +209,8 @@ public class BuildCubeWithStream {
protected void deployEnv() throws IOException {
DeployUtil.overrideJobJarLocations();
- //DeployUtil.initCliWorkDir();
- //DeployUtil.deployMetadata();
+// DeployUtil.initCliWorkDir();
+// DeployUtil.deployMetadata();
}
public static void beforeClass() throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
index 7959701..d8c857f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
@@ -45,7 +45,7 @@ import static java.lang.Thread.sleep;
public class BuildCubeWithStream2 extends BuildCubeWithStream {
private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream2.class);
- private static boolean generateData = true;
+ private boolean generateData = true;
@Override
public void build() throws Exception {
@@ -76,6 +76,7 @@ public class BuildCubeWithStream2 extends BuildCubeWithStream {
List<FutureTask<ExecutableState>> futures = Lists.newArrayList();
for (int i = 0; i < 5; i++) {
+ Thread.sleep(2 * 60 * 1000); // sleep 2 mintues
FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() {
@Override
public ExecutableState call() {
@@ -92,7 +93,6 @@ public class BuildCubeWithStream2 extends BuildCubeWithStream {
executorService.submit(futureTask);
futures.add(futureTask);
- Thread.sleep(2 * 60 * 1000); // sleep 2 mintues
}
generateData = false; // stop generating message to kafka
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 59a3a04..93d47f1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
+import java.util.TimeZone;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -65,6 +66,8 @@ public class ITKylinQueryTest extends KylinTestBase {
RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]");
RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]");
+
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index d0bcf52..57c4f4d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -586,7 +586,7 @@ public class KylinTestBase {
//setup cube conn
File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config);
Properties props = new Properties();
- props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "15001");
+ props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "20001");
cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props);
//setup h2
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index a5f678f..729719a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -118,7 +118,7 @@ public class KafkaMRInput implements IMRInput {
}
}
Text text = (Text) mapperInput;
- ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()).slice();
+ ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength());
StreamingMessage streamingMessage = streamingParser.parse(buffer);
return streamingMessage.getData().toArray(new String[streamingMessage.getData().size()]);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index d3530f1..148ae25 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -47,7 +47,6 @@ public final class TimedJsonStreamParser extends StreamingParser {
private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
private List<TblColRef> allColumns;
- private boolean formatTs = false;//not used
private final ObjectMapper mapper = new ObjectMapper();
private String tsColName = "timestamp";
private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
@@ -61,9 +60,6 @@ public final class TimedJsonStreamParser extends StreamingParser {
String[] parts = prop.split("=");
if (parts.length == 2) {
switch (parts[0]) {
- case "formatTs":
- this.formatTs = Boolean.valueOf(parts[1]);
- break;
case "tsColName":
this.tsColName = parts[1];
break;
@@ -78,7 +74,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
}
- logger.info("TimedJsonStreamParser with formatTs {} tsColName {}", formatTs, tsColName);
+ logger.info("TimedJsonStreamParser with tsColName {}", tsColName);
}
@Override
@@ -105,7 +101,6 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
}
- logger.info("Streaming Message: " + result.toString());
return new StreamingMessage(result, 0, t, Collections.<String, Object> emptyMap());
} catch (IOException e) {
logger.error("error", e);