You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/27 12:57:52 UTC
incubator-kylin git commit: GTAggregateScanner bug fix to pass CI
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 a499adff8 -> db6cbe288
GTAggregateScanner bug fix to pass CI
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/db6cbe28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/db6cbe28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/db6cbe28
Branch: refs/heads/0.8.0
Commit: db6cbe2887d09c21d40eb35554cf5e57bb064ebd
Parents: a499adf
Author: Li, Yang <ya...@ebay.com>
Authored: Wed May 27 18:56:40 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed May 27 18:57:25 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/HadoopUtil.java | 11 +-
.../kylin/dict/lookup/FileTableReader.java | 1 +
.../storage/gridtable/GTAggregateScanner.java | 164 +++++--------------
.../storage/gridtable/DictGridTableTest.java | 2 +-
4 files changed, 55 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db6cbe28/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 84bab94..2486d6c 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -57,11 +57,20 @@ public class HadoopUtil {
public static URI makeURI(String filePath) {
try {
- return new URI(filePath);
+ return new URI(fixWindowsPath(filePath));
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Cannot create FileSystem from URI: " + filePath, e);
}
}
+
+ public static String fixWindowsPath(String path) {
+ // fix windows path
+ if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
+ path = path.replace("file://", "file:///");
+ path = path.replace('\\', '/');
+ }
+ return path;
+ }
public static Configuration newHadoopJobConfiguration() {
Configuration conf = new Configuration();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db6cbe28/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
index e5a80c0..3631b25 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
@@ -65,6 +65,7 @@ public class FileTableReader implements TableReader {
}
public FileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException {
+ filePath = HadoopUtil.fixWindowsPath(filePath);
this.filePath = filePath;
this.delim = delim;
this.expectedColumnNumber = expectedColumnNumber;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db6cbe28/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index fff2b77..ef41e05 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -2,6 +2,7 @@ package org.apache.kylin.storage.gridtable;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.slf4j.Logger;
@@ -15,9 +16,12 @@ import java.util.Iterator;
import java.util.Map.Entry;
import java.util.SortedMap;
+@SuppressWarnings({ "rawtypes", "unchecked" })
public class GTAggregateScanner implements IGTScanner {
+ @SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
+
final GTInfo info;
final BitSet dimensions; // dimensions to return, can be more than group by
final BitSet groupBy;
@@ -25,11 +29,10 @@ public class GTAggregateScanner implements IGTScanner {
final String[] metricsAggrFuncs;
final IGTScanner inputScanner;
-
public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
if (req.hasAggregation() == false)
throw new IllegalStateException();
-
+
this.info = inputScanner.getInfo();
this.dimensions = (BitSet) req.getColumns().clone();
this.dimensions.andNot(req.getAggrMetrics());
@@ -43,7 +46,7 @@ public class GTAggregateScanner implements IGTScanner {
public GTInfo getInfo() {
return info;
}
-
+
@Override
public int getScannedRowCount() {
return inputScanner.getScannedRowCount();
@@ -71,20 +74,25 @@ public class GTAggregateScanner implements IGTScanner {
class AggregationCacheWithBytesKey {
final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+ final int keyLength;
+ final boolean[] compareMask;
public AggregationCacheWithBytesKey() {
+ compareMask = createCompareMask();
+ keyLength = compareMask.length;
aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
@Override
public int compare(byte[] o1, byte[] o2) {
int result = 0;
- Preconditions.checkArgument(o1.length == o2.length);
- final int length = o1.length;
- for (int i = 0; i < length; ++i) {
- result = o1[i] - o2[i];
- if (result == 0) {
- continue;
- } else {
- return result;
+ Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
+ for (int i = 0; i < keyLength; ++i) {
+ if (compareMask[i]) {
+ result = o1[i] - o2[i];
+ if (result == 0) {
+ continue;
+ } else {
+ return result;
+ }
}
}
return result;
@@ -92,10 +100,29 @@ public class GTAggregateScanner implements IGTScanner {
});
}
+ private boolean[] createCompareMask() {
+ int keyLength = 0;
+ for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
+ int l = info.codeSystem.maxCodeLength(i);
+ keyLength += l;
+ }
+
+ boolean[] mask = new boolean[keyLength];
+ int p = 0;
+ for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
+ int l = info.codeSystem.maxCodeLength(i);
+ boolean m = groupBy.get(i) ? true : false;
+ for (int j = 0; j < l; j++) {
+ mask[p++] = m;
+ }
+ }
+ return mask;
+ }
+
private byte[] createKey(GTRecord record) {
- byte[] result = new byte[info.getMaxColumnLength(groupBy)];
+ byte[] result = new byte[keyLength];
int offset = 0;
- for (int i = groupBy.nextSetBit(0); i >= 0; i = groupBy.nextSetBit(i + 1)) {
+ for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
final ByteArray byteArray = record.cols[i];
final int columnLength = info.codeSystem.maxCodeLength(i);
System.arraycopy(byteArray.array(), byteArray.offset(), result, offset, byteArray.length());
@@ -129,13 +156,7 @@ public class GTAggregateScanner implements IGTScanner {
final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
- final GTRecord secondRecord;
-
- {
- BitSet dimensionsAndMetrics = (BitSet) groupBy.clone();
- dimensionsAndMetrics.or(metrics);
- secondRecord = new GTRecord(info, dimensionsAndMetrics);
- }
+ final GTRecord secondRecord = new GTRecord(info);
@Override
public boolean hasNext() {
@@ -151,7 +172,7 @@ public class GTAggregateScanner implements IGTScanner {
private void create(byte[] key, MeasureAggregator[] value) {
int offset = 0;
- for (int i = groupBy.nextSetBit(0); i >= 0; i = groupBy.nextSetBit(i + 1)) {
+ for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
final int columnLength = info.codeSystem.maxCodeLength(i);
secondRecord.set(i, new ByteArray(key, offset, columnLength));
offset += columnLength;
@@ -173,103 +194,4 @@ public class GTAggregateScanner implements IGTScanner {
}
}
- /*
- @SuppressWarnings({ "rawtypes", "unchecked" })
- class AggregationCache {
- final SortedMap<GTRecord, MeasureAggregator[]> aggBufMap;
-
- public AggregationCache() {
- this.aggBufMap = Maps.newTreeMap();
- }
-
- void aggregate(GTRecord r) {
- r.maskForEqualHashComp(groupBy);
- MeasureAggregator[] aggrs = aggBufMap.get(r);
- if (aggrs == null) {
- aggrs = new MeasureAggregator[metricsAggrFuncs.length];
- for (int i = 0, col = -1; i < aggrs.length; i++) {
- col = metrics.nextSetBit(col + 1);
- aggrs[i] = info.codeSystem.newMetricsAggregator(metricsAggrFuncs[i], col);
- }
- aggBufMap.put(r.copy(dimensions), aggrs);
- }
-
- for (int i = 0, col = -1; i < aggrs.length; i++) {
- col = metrics.nextSetBit(col + 1);
- Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer());
- aggrs[i].aggregate(metrics);
- }
- }
-
- public Iterator<GTRecord> iterator() {
- return new Iterator<GTRecord>() {
-
- final Iterator<Entry<GTRecord, MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
-
- final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
- final GTRecord oneRecord; // avoid instance creation
-
- {
- BitSet dimensionsAndMetrics = (BitSet) groupBy.clone();
- dimensionsAndMetrics.or(metrics);
- oneRecord = new GTRecord(info, dimensionsAndMetrics);
- }
-
- @Override
- public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override
- public GTRecord next() {
- Entry<GTRecord, MeasureAggregator[]> entry = it.next();
-
- GTRecord dims = entry.getKey();
- for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
- oneRecord.cols[i].set(dims.cols[i]);
- }
-
- metricsBuf.clear();
- MeasureAggregator[] aggrs = entry.getValue();
- for (int i = 0, col = -1; i < aggrs.length; i++) {
- col = metrics.nextSetBit(col + 1);
- int pos = metricsBuf.position();
- info.codeSystem.encodeColumnValue(col, aggrs[i].getState(), metricsBuf);
- oneRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
- }
- return oneRecord;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- transient int rowMemBytes;
- static final int MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
-
- public void checkMemoryUsage() {
- // about memory calculation,
- // http://seniorjava.wordpress.com/2013/09/01/java-objects-memory-size-reference/
- if (rowMemBytes <= 0) {
- if (aggBufMap.size() > 0) {
- rowMemBytes = 0;
- MeasureAggregator[] measureAggregators = aggBufMap.get(aggBufMap.firstKey());
- for (MeasureAggregator agg : measureAggregators) {
- rowMemBytes += agg.getMemBytes();
- }
- }
- }
- int size = aggBufMap.size();
- int memUsage = (40 + rowMemBytes) * size;
- if (memUsage > MEMORY_USAGE_CAP) {
- throw new RuntimeException("Kylin coprocess memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " + size + " > " + MEMORY_USAGE_CAP + ". Abord coprocessor.");
- }
- }
- }
-
- */
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db6cbe28/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
index 3607175..af47b89 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
@@ -69,7 +69,7 @@ public class DictGridTableTest {
List<GTScanRange> r = planner.planScanRanges(filter);
assertEquals(1, r.size());
assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString());
- assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).hbaseFuzzyKeys.toString());
+ assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString());
}
// pre-evaluate ever false