You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/07 03:26:02 UTC
incubator-kylin git commit: KYLIN-808 fix time str dict merge issue
Repository: incubator-kylin
Updated Branches:
refs/heads/newcase [created] c2a469c49
KYLIN-808 fix time str dict merge issue
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c2a469c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c2a469c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c2a469c4
Branch: refs/heads/newcase
Commit: c2a469c49da9d495bf648bd34afc3da4420536fc
Parents: dda27de
Author: honma <ho...@ebay.com>
Authored: Fri Jun 5 17:19:02 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Sun Jun 7 09:22:39 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/BasicTest.java | 2 +-
.../apache/kylin/dict/DictionaryGenerator.java | 17 +++----
.../apache/kylin/dict/DictionaryManager.java | 51 +++++++++++++------
.../apache/kylin/dict/TimeStrDictionary.java | 8 +++
.../kylin/dict/TimeStrDictionaryTests.java | 2 +
.../cubev2/MergeCuboidFromHBaseMapper.java | 15 +++---
.../kylin/job/streaming/StreamingBootstrap.java | 2 +-
.../job/hadoop/cube/MergeCuboidMapperTest.java | 52 ++++++++++----------
.../apache/kylin/rest/service/BasicService.java | 5 ++
.../apache/kylin/rest/service/CacheService.java | 6 ++-
.../apache/kylin/streaming/KafkaConsumer.java | 3 +-
11 files changed, 101 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index ff1cd49..73f708d 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -77,7 +77,7 @@ public class BasicTest {
@Test
@Ignore("convenient trial tool for dev")
public void test1() throws Exception {
- System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433423490000L));
+ System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433496233000L));
System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433250517000L));
System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-06-01 00:00:00"));
System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-05-15 17:00:00"));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 7c1f808..a8e27b3 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -45,12 +45,6 @@ public class DictionaryGenerator {
private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd" };
- public static Dictionary<?> buildDictionaryFromValueList(DictionaryInfo info, Collection<byte[]> values) {
- Dictionary<?> dict = buildDictionaryFromValueList(DataType.getInstance(info.getDataType()), values);
- info.setCardinality(dict.getSize());
- return dict;
- }
-
public static Dictionary<?> buildDictionaryFromValueList(DataType dataType, Collection<byte[]> values) {
Preconditions.checkNotNull(dataType, "dataType cannot be null");
Dictionary dict;
@@ -83,7 +77,7 @@ public class DictionaryGenerator {
return dict;
}
- public static Dictionary mergeDictionaries(DictionaryInfo targetInfo, List<DictionaryInfo> sourceDicts) {
+ public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) {
HashSet<byte[]> dedup = new HashSet<byte[]>();
@@ -101,7 +95,8 @@ public class DictionaryGenerator {
List<byte[]> valueList = new ArrayList<byte[]>();
valueList.addAll(dedup);
- return buildDictionaryFromValueList(targetInfo, valueList);
+ Dictionary<?> dict = buildDictionaryFromValueList(dataType, valueList);
+ return dict;
}
public static Dictionary<?> buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException {
@@ -109,11 +104,13 @@ public class DictionaryGenerator {
// currently all data types are casted to string to build dictionary
// String dataType = info.getDataType();
- logger.debug("Building dictionary " + JsonUtil.writeValueAsString(info));
+ logger.debug("Building dictionary object " + JsonUtil.writeValueAsString(info));
ArrayList<byte[]> values = loadColumnValues(inpTable, info.getSourceColumnIndex());
- return buildDictionaryFromValueList(info, values);
+ Dictionary<?> dict = buildDictionaryFromValueList(DataType.getInstance(info.getDataType()), values);
+
+ return dict;
}
private static Dictionary buildDateTimeDict(Collection<byte[]> values, int baseId, int nSamples, ArrayList samples) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index be45899..4b61903 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -18,31 +18,32 @@
package org.apache.kylin.dict;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.google.common.base.Preconditions;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.kylin.dict.lookup.FileTable;
-import org.apache.kylin.dict.lookup.HiveTable;
-import org.apache.kylin.dict.lookup.TableSignature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.dict.lookup.FileTable;
+import org.apache.kylin.dict.lookup.HiveTable;
import org.apache.kylin.dict.lookup.ReadableTable;
+import org.apache.kylin.dict.lookup.TableSignature;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
public class DictionaryManager {
@@ -116,6 +117,8 @@ public class DictionaryManager {
}
public DictionaryInfo mergeDictionary(List<DictionaryInfo> dicts) throws IOException {
+ Preconditions.checkArgument(dicts.size() >= 2, "dicts size not valid");
+
DictionaryInfo firstDictInfo = null;
int totalSize = 0;
for (DictionaryInfo info : dicts) {
@@ -134,6 +137,7 @@ public class DictionaryManager {
throw new IllegalArgumentException("DictionaryManager.mergeDictionary input cannot be null");
}
+ //identical
DictionaryInfo newDictInfo = new DictionaryInfo(firstDictInfo);
TableSignature signature = newDictInfo.getInput();
signature.setSize(totalSize);
@@ -146,9 +150,23 @@ public class DictionaryManager {
return getDictionaryInfo(dupDict);
}
- Dictionary<?> newDict = DictionaryGenerator.mergeDictionaries(newDictInfo, dicts);
+ //check for cases where merging dicts are actually same
+ boolean identicalSourceDicts = true;
+ for (int i = 1; i < dicts.size(); ++i) {
+ if (!dicts.get(0).getDictionaryObject().equals(dicts.get(i).getDictionaryObject())) {
+ identicalSourceDicts = false;
+ break;
+ }
+ }
- return trySaveNewDict(newDict, newDictInfo);
+ if (identicalSourceDicts) {
+ logger.info("Use one of the merging dictionaries directly");
+ return dicts.get(0);
+ } else {
+ Dictionary<?> newDict = DictionaryGenerator.mergeDictionaries(DataType.getInstance(newDictInfo.getDataType()), dicts);
+ newDictInfo.setCardinality(newDict.getSize());
+ return trySaveNewDict(newDict, newDictInfo);
+ }
}
public DictionaryInfo buildDictionary(DataModelDesc model, String dict, TblColRef col, String factColumnsPath) throws IOException {
@@ -170,6 +188,7 @@ public class DictionaryManager {
}
Dictionary<?> dictionary = DictionaryGenerator.buildDictionary(dictInfo, inpTable);
+ dictInfo.setCardinality(dictionary.getSize());
return trySaveNewDict(dictionary, dictInfo);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
index fde17a8..3759d23 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
@@ -105,6 +105,14 @@ public class TimeStrDictionary extends Dictionary<String> {
}
@Override
+ public boolean equals(Object o) {
+ if (o == null)
+ return false;
+
+ return o instanceof TimeStrDictionary;
+ }
+
+ @Override
public void write(DataOutput out) throws IOException {
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java b/dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java
index 92f62fb..93b2dae 100644
--- a/dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java
+++ b/dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java
@@ -57,4 +57,6 @@ public class TimeStrDictionaryTests {
Assert.assertEquals(origin, back);
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
index ef59d9b..a8283aa 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
@@ -24,9 +24,9 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.kylin.common.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.SplittedBytes;
@@ -44,7 +44,6 @@ import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.hadoop.cube.KeyValueCreator;
import org.apache.kylin.metadata.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -71,7 +70,7 @@ public class MergeCuboidFromHBaseMapper extends TableMapper<ImmutableBytesWritab
private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
// life cycle
-// private Text outputKey = new Text();
+ // private Text outputKey = new Text();
private ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
private byte[] newKeyBuf;
@@ -102,7 +101,6 @@ public class MergeCuboidFromHBaseMapper extends TableMapper<ImmutableBytesWritab
}
}
-
private CubeSegment findSegmentWithHTable(String htable, CubeInstance cubeInstance) {
for (CubeSegment segment : cubeInstance.getSegments()) {
String segmentHtable = segment.getStorageLocationIdentifier();
@@ -189,8 +187,14 @@ public class MergeCuboidFromHBaseMapper extends TableMapper<ImmutableBytesWritab
}
int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
+
int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
- int idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
+ int idInMergedDict;
+ if (size < 0) {
+ idInMergedDict = mergedDict.nullId();
+ } else {
+ idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
+ }
BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
bufOffset += mergedDict.getSizeOfId();
@@ -232,5 +236,4 @@ public class MergeCuboidFromHBaseMapper extends TableMapper<ImmutableBytesWritab
context.write(outputKey, outputValue);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index fd590d2..a3036e2 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -172,7 +172,7 @@ public class StreamingBootstrap {
int batchInterval = 5 * 60 * 1000;
MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
- long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
+ long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis() - 30 * 60 * 1000, (long) batchInterval) : cubeInstance.getDateRangeEnd();
logger.info("batch time interval is {} to {}", DateFormat.formatToTimeStr(startTimestamp), DateFormat.formatToTimeStr(startTimestamp + batchInterval));
StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
index 83b995a..b1d37bb 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
@@ -22,13 +22,14 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.dict.*;
import org.apache.kylin.dict.lookup.TableSignature;
import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.ProjectManager;
import org.junit.After;
@@ -73,7 +74,8 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
List<byte[]> values = new ArrayList<byte[]>();
values.add(new byte[] { 101, 101, 101 });
values.add(new byte[] { 102, 102, 102 });
- Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(newDictInfo, values);
+ Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()), values);
+ newDictInfo.setCardinality(dict.getSize());
dictionaryManager.trySaveNewDict(dict, newDictInfo);
((TrieDictionary) dict).dump(System.out);
@@ -125,7 +127,8 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
values.add(new byte[] { 99, 99, 99 });
else
values.add(new byte[] { 98, 98, 98 });
- Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(newDictInfo, values);
+ Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()), values);
+ newDictInfo.setCardinality(dict.getSize());
dictionaryManager.trySaveNewDict(dict, newDictInfo);
((TrieDictionary) dict).dump(System.out);
@@ -139,7 +142,6 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
isFirstSegment = false;
}
-
CubeBuilder cubeBuilder = new CubeBuilder(cube);
cubeBuilder.setToUpdateSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
cube = cubeManager.updateCube(cubeBuilder);
@@ -155,33 +157,33 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
@Test
public void test() throws IOException, ParseException {
-// String cubeName = "test_kylin_cube_without_slr_left_join_ready_2_segments";
+ // String cubeName = "test_kylin_cube_without_slr_left_join_ready_2_segments";
CubeSegment newSeg = cubeManager.mergeSegments(cube, 0L, 1386835200000L);
-// String segmentName = newSeg.getName();
+ // String segmentName = newSeg.getName();
final Dictionary<?> dictionary = cubeManager.getDictionary(newSeg, lfn);
assertTrue(dictionary == null);
-// ((TrieDictionary) dictionary).dump(System.out);
+ // ((TrieDictionary) dictionary).dump(System.out);
// hack for distributed cache
-// File metaDir = new File("../job/meta");
-// FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), metaDir);
-//
-// mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-// mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-// // mapDriver.getConfiguration().set(KylinConfig.KYLIN_METADATA_URL,
-// // "../job/meta");
-//
-// byte[] key = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 1 };
-// byte[] value = new byte[] { 1, 2, 3 };
-// byte[] newkey = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 2 };
-// byte[] newvalue = new byte[] { 1, 2, 3 };
-//
-// mapDriver.withInput(new Text(key), new Text(value));
-// mapDriver.withOutput(new Text(newkey), new Text(newvalue));
-// mapDriver.setMapInputPath(new Path("/apps/hdmi-prod/b_kylin/prod/kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa/vac_sw_cube_v4/cuboid/15d_cuboid"));
-//
-// mapDriver.runTest();
+ // File metaDir = new File("../job/meta");
+ // FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), metaDir);
+ //
+ // mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ // mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ // // mapDriver.getConfiguration().set(KylinConfig.KYLIN_METADATA_URL,
+ // // "../job/meta");
+ //
+ // byte[] key = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 1 };
+ // byte[] value = new byte[] { 1, 2, 3 };
+ // byte[] newkey = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 2 };
+ // byte[] newvalue = new byte[] { 1, 2, 3 };
+ //
+ // mapDriver.withInput(new Text(key), new Text(value));
+ // mapDriver.withOutput(new Text(newkey), new Text(newvalue));
+ // mapDriver.setMapInputPath(new Path("/apps/hdmi-prod/b_kylin/prod/kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa/vac_sw_cube_v4/cuboid/15d_cuboid"));
+ //
+ // mapDriver.runTest();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index c9eb150..0a8c808 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -83,6 +83,11 @@ public abstract class BasicService {
}
}
+ protected void cleanAllDataCache() {
+ if (cacheManager != null)
+ cacheManager.clearAll();
+ }
+
public void removeOLAPDataSource(String project) {
logger.info("removeOLAPDataSource is called for project " + project);
if (StringUtils.isEmpty(project))
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 56f9d4b..6cc0de3 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -27,11 +27,13 @@ import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.invertedindex.IIDescManager;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.job.cube.CubingJob;
import org.apache.kylin.job.cube.CubingJobBuilder;
import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.rest.constant.Constant;
@@ -101,12 +103,14 @@ public class CacheService extends BasicService {
CubeDescManager.clearCache();
break;
case ALL:
- getMetadataManager().reload();
+ MetadataManager.clearCache();
+ DictionaryManager.clearCache();
CubeDescManager.clearCache();
CubeManager.clearCache();
IIDescManager.clearCache();
IIManager.clearCache();
ProjectManager.clearCache();
+ cleanAllDataCache();
removeAllOLAPDataSources();
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 488f6e3..6f5e279 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -111,7 +111,6 @@ public class KafkaConsumer implements Runnable {
while (isRunning) {
int consumeMsgCountAtBeginning = consumeMsgCount;
fetchRound++;
- logger.info("start " + fetchRound + "th round of fetching");
if (leadBroker == null) {
leadBroker = getLeadBroker();
@@ -141,7 +140,7 @@ public class KafkaConsumer implements Runnable {
offset++;
consumeMsgCount++;
}
- logger.info("Number of messages consumed: " + consumeMsgCount + " offset is " + offset);
+ logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound);
if (consumeMsgCount == consumeMsgCountAtBeginning)//nothing this round
{