You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/10/26 06:41:09 UTC
[1/2] incubator-kylin git commit: # This is a combination of 10
commits. # The first commit's message is: KYLIN-1068 Optimize the memory
footprint for TopN counter
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging 06aeb994e -> d0e0d9fc9
# This is a combination of 10 commits.
# The first commit's message is:
KYLIN-1068 Optimize the memory footprint for TopN counter
# This is the 2nd commit message:
KYLIN-1068 code small refactor
# This is the 3rd commit message:
KYLIN-1068 remove “error” from Counter as it is invisible
# This is the 4th commit message:
KYLIN-1066 for sandbox test, limit max reducer to 5
# This is the 5th commit message:
KYLIN-1068 give bigger random range for PRICE column on test_kylin_fact table
# This is the 6th commit message:
KYLIN-1068 small changes
# This is the 7th commit message:
KYLIN-1068 add log in htable cleanup
# This is the 8th commit message:
KYLIN-1068 cleanup test case
# This is the 9th commit message:
KYLIN-1068 code refine
# This is the 10th commit message:
KYLIN-1068 only build 1 segment for test cubes to accelerate CI
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d1b2141d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d1b2141d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d1b2141d
Branch: refs/heads/2.x-staging
Commit: d1b2141dfad5309897f6efc64c5f743f3c10c0b3
Parents: 06aeb99
Author: shaofengshi <sh...@apache.org>
Authored: Mon Oct 12 22:05:32 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 26 13:30:16 2015 +0800
----------------------------------------------------------------------
.../kylin/job/BuildCubeWithEngineTest.java | 11 +-
.../kylin/job/BuildCubeWithStreamTest.java | 5 +-
.../java/org/apache/kylin/job/DataGenTest.java | 4 +-
.../kylin/job/hadoop/invertedindex/IITest.java | 1 -
.../org/apache/kylin/common/topn/Counter.java | 18 +-
.../kylin/common/topn/DoublyLinkedList.java | 84 ++-----
.../apache/kylin/common/topn/TopNCounter.java | 234 ++++++-------------
.../kylin/common/topn/TopNCounterBasicTest.java | 4 +-
.../common/topn/TopNCounterCombinationTest.java | 1 -
.../kylin/common/topn/TopNCounterTest.java | 2 +-
.../kylin/metadata/measure/TopNAggregator.java | 6 +-
.../serializer/TopNCounterSerializer.java | 23 +-
.../localmeta/data/data_gen_config.json | 8 +
.../test_case_data/sandbox/kylin.properties | 8 +-
.../hbase/steps/HBaseMROutput2Transition.java | 1 +
.../storage/hbase/util/StorageCleanupJob.java | 4 +-
16 files changed, 153 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 5be11f1..26ad960 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -196,8 +196,8 @@ public class BuildCubeWithEngineTest {
long date2 = f.parse("2013-01-01").getTime();
long date3 = f.parse("2022-01-01").getTime();
List<String> result = Lists.newArrayList();
- result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2));
- result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
+ result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date3));
+// result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
return result;
}
@@ -229,6 +229,8 @@ public class BuildCubeWithEngineTest {
final String cubeName = "test_kylin_cube_without_slr_left_join_empty";
// this cube's start date is 0, end date is 20120601000000
long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
+
+ /*
long dateEnd = f.parse("2012-06-01").getTime();
clearSegment(cubeName);
@@ -245,6 +247,11 @@ public class BuildCubeWithEngineTest {
dateEnd = f.parse("2023-01-01").getTime();
result.add(buildSegment(cubeName, dateStart, dateEnd));
+*/
+ long dateEnd = f.parse("2023-01-01").getTime();
+
+ clearSegment(cubeName);
+ result.add(buildSegment(cubeName, dateStart, dateEnd));
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index 07ab947..63bba70 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -113,7 +113,10 @@ public class BuildCubeWithStreamTest {
private static void backup() throws Exception {
int exitCode = cleanupOldStorage();
if (exitCode == 0) {
- exportHBaseData();
+ String exportHTables = System.getProperty("kylinExportHTables");
+ if (Boolean.parseBoolean(exportHTables) == true) {
+ exportHBaseData();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java b/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java
index 5c01305..af4f9fb 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java
@@ -46,11 +46,11 @@ public class DataGenTest extends LocalFileMetadataTestCase {
@Test
public void testBasics() throws Exception {
String content = FactTableGenerator.generate("test_kylin_cube_with_slr_ready", "10000", "1", null);// default settings
- System.out.println(content);
+ //System.out.println(content);
assertTrue(content.contains("FP-non GTC"));
assertTrue(content.contains("ABIN"));
- DeployUtil.overrideFactTableData(content, "default.test_kylin_fact");
+ //DeployUtil.overrideFactTableData(content, "default.test_kylin_fact");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 2a643c8..73f4098 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -69,7 +69,6 @@ public class IITest extends LocalFileMetadataTestCase {
String iiName = "test_kylin_ii_inner_join";
IIInstance ii;
IIDesc iiDesc;
- String cubeName = "test_kylin_cube_with_slr_empty";
List<IIRow> iiRows;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
index 866d3d8..2bca4df 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -31,11 +31,9 @@ import java.io.ObjectOutput;
*/
public class Counter<T> implements Externalizable {
- protected ListNode2<TopNCounter<T>.Bucket> bucketNode;
-
protected T item;
protected double count;
- protected double error;
+// protected double error;
/**
* For de-serialization
@@ -43,10 +41,9 @@ public class Counter<T> implements Externalizable {
public Counter() {
}
- public Counter(ListNode2<TopNCounter<T>.Bucket> bucket, T item) {
- this.bucketNode = bucket;
+ public Counter(T item) {
this.count = 0;
- this.error = 0;
+// this.error = 0;
this.item = item;
}
@@ -58,13 +55,14 @@ public class Counter<T> implements Externalizable {
return count;
}
- public double getError() {
- return error;
- }
+// public double getError() {
+// return error;
+// }
@Override
public String toString() {
- return item + ":" + count + ':' + error;
+// return item + ":" + count + ':' + error;
+ return item + ":" + count;
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
index 0942b84..d268a30 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -26,11 +26,11 @@ import java.util.Iterator;
*
* @param <T>
*/
-public class DoublyLinkedList<T> implements Iterable<T> {
+public class DoublyLinkedList<T> {
- protected int size;
- protected ListNode2<T> tail;
- protected ListNode2<T> head;
+ private int size;
+ private ListNode2<T> tail;
+ private ListNode2<T> head;
/**
* Append to head of list
@@ -54,6 +54,11 @@ public class DoublyLinkedList<T> implements Iterable<T> {
*/
public ListNode2<T> enqueue(T value) {
ListNode2<T> node = new ListNode2<T>(value);
+
+ return enqueue(node);
+ }
+
+ public ListNode2<T> enqueue(ListNode2<T> node) {
if (size++ == 0) {
head = node;
} else {
@@ -97,6 +102,19 @@ public class DoublyLinkedList<T> implements Iterable<T> {
size++;
}
+
+ public void addBefore(ListNode2<T> node, ListNode2<T> newNode) {
+ newNode.prev = node.prev;
+ newNode.next = node;
+ node.prev = newNode;
+ if (newNode.prev == null) {
+ tail = newNode;
+ } else {
+ newNode.prev.next = newNode;
+ }
+ size++;
+ }
+
public void remove(ListNode2<T> node) {
if (node == tail) {
tail = node.next;
@@ -115,54 +133,7 @@ public class DoublyLinkedList<T> implements Iterable<T> {
public int size() {
return size;
}
-
-
- @Override
- public Iterator<T> iterator() {
- return new DoublyLinkedListIterator(this);
- }
-
- protected class DoublyLinkedListIterator implements Iterator<T> {
-
- protected DoublyLinkedList<T> list;
- protected ListNode2<T> itr;
- protected int length;
-
- public DoublyLinkedListIterator(DoublyLinkedList<T> list) {
- this.length = list.size;
- this.list = list;
- this.itr = list.tail;
- }
-
- @Override
- public boolean hasNext() {
- return itr != null;
- }
-
- @Override
- public T next() {
- if (length != list.size) {
- throw new ConcurrentModificationException();
- }
- T next = itr.value;
- itr = itr.next;
- return next;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- }
-
- public T first() {
- return tail == null ? null : tail.getValue();
- }
-
- public T last() {
- return head == null ? null : head.getValue();
- }
+
public ListNode2<T> head() {
return head;
@@ -176,13 +147,4 @@ public class DoublyLinkedList<T> implements Iterable<T> {
return size == 0;
}
- @SuppressWarnings("unchecked")
- public T[] toArray() {
- T[] a = (T[]) new Object[size];
- int i = 0;
- for (T v : this) {
- a[i++] = v;
- }
- return a;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 6814b8d..1856010 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -21,8 +21,6 @@ package org.apache.kylin.common.topn;
import com.google.common.collect.Lists;
import org.apache.kylin.common.util.Pair;
-import java.io.*;
-import java.nio.ByteBuffer;
import java.util.*;
/**
@@ -36,29 +34,12 @@ import java.util.*;
* @param <T> type of data in the stream to be summarized
*/
public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
-
- public static final int EXTRA_SPACE_RATE = 50;
-
-
- protected class Bucket {
-
- protected DoublyLinkedList<Counter<T>> counterList;
-
- private double count;
- public Bucket(double count) {
- this.count = count;
- this.counterList = new DoublyLinkedList<Counter<T>>();
- }
-
- public int size() {
- return counterList.size();
- }
- }
+ public static final int EXTRA_SPACE_RATE = 50;
protected int capacity;
private HashMap<T, ListNode2<Counter<T>>> counterMap;
- protected DoublyLinkedList<Bucket> bucketList;
+ protected DoublyLinkedList<Counter<T>> counterList;
/**
* @param capacity maximum size (larger capacities improve accuracy)
@@ -66,7 +47,7 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
public TopNCounter(int capacity) {
this.capacity = capacity;
counterMap = new HashMap<T, ListNode2<Counter<T>>>();
- bucketList = new DoublyLinkedList<Bucket>();
+ counterList = new DoublyLinkedList<Counter<T>>();
}
public int getCapacity() {
@@ -114,15 +95,14 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
if (isNewItem) {
if (size() < capacity) {
- counterNode = bucketList.enqueue(new Bucket(0)).getValue().counterList.add(new Counter<T>(bucketList.tail(), item));
+ counterNode = counterList.enqueue(new Counter<T>(item));
} else {
- Bucket min = bucketList.first();
- counterNode = min.counterList.tail();
+ counterNode = counterList.tail();
Counter<T> counter = counterNode.getValue();
droppedItem = counter.item;
counterMap.remove(droppedItem);
counter.item = item;
- counter.error = min.count;
+ counter.count = 0.0;
}
counterMap.put(item, counterNode);
}
@@ -133,56 +113,35 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
}
protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) {
- Counter<T> counter = counterNode.getValue(); // count_i
- ListNode2<Bucket> oldNode = counter.bucketNode;
- Bucket bucket = oldNode.getValue(); // Let Bucket_i be the bucket of count_i
- bucket.counterList.remove(counterNode); // Detach count_i from Bucket_i's child-list
- counter.count = counter.count + incrementCount;
-
- // Finding the right bucket for count_i
- // Because we allow a single call to increment count more than once, this may not be the adjacent bucket.
- ListNode2<Bucket> bucketNodePrev = oldNode;
- ListNode2<Bucket> bucketNodeNext = bucketNodePrev.getNext();
- while (bucketNodeNext != null) {
- Bucket bucketNext = bucketNodeNext.getValue(); // Let Bucket_i^+ be Bucket_i's neighbor of larger value
- if (counter.count == bucketNext.count) {
- bucketNext.counterList.add(counterNode); // Attach count_i to Bucket_i^+'s child-list
- break;
- } else if (counter.count > bucketNext.count) {
- bucketNodePrev = bucketNodeNext;
- bucketNodeNext = bucketNodePrev.getNext(); // Continue hunting for an appropriate bucket
- } else {
- // A new bucket has to be created
- bucketNodeNext = null;
- }
+ Counter<T> counter = counterNode.getValue();
+ counter.count += incrementCount;
+
+ ListNode2<Counter<T>> nodeNext = counterNode.getNext();
+ counterList.remove(counterNode);
+ counterNode.prev = null;
+ counterNode.next = null;
+ while (nodeNext != null && counter.count >= nodeNext.getValue().count) {
+ nodeNext = nodeNext.getNext();
}
- if (bucketNodeNext == null) {
- Bucket bucketNext = new Bucket(counter.count);
- bucketNext.counterList.add(counterNode);
- bucketNodeNext = bucketList.addAfter(bucketNodePrev, bucketNext);
+ if (nodeNext != null) {
+ counterList.addBefore(nodeNext, counterNode);
+ } else {
+ counterList.add(counterNode);
}
- counter.bucketNode = bucketNodeNext;
- //Cleaning up
- if (bucket.counterList.isEmpty()) // If Bucket_i's child-list is empty
- {
- bucketList.remove(oldNode); // Detach Bucket_i from the Stream-Summary
- }
}
@Override
public List<T> peek(int k) {
List<T> topK = new ArrayList<T>(k);
- for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
- Bucket b = bNode.getValue();
- for (Counter<T> c : b.counterList) {
- if (topK.size() == k) {
- return topK;
- }
- topK.add(c.item);
+ for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Counter<T> b = bNode.getValue();
+ if (topK.size() == k) {
+ return topK;
}
+ topK.add(b.item);
}
return topK;
@@ -191,14 +150,12 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
public List<Counter<T>> topK(int k) {
List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
- for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
- Bucket b = bNode.getValue();
- for (Counter<T> c : b.counterList) {
- if (topK.size() == k) {
- return topK;
- }
- topK.add(c);
+ for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Counter<T> b = bNode.getValue();
+ if (topK.size() == k) {
+ return topK;
}
+ topK.add(b);
}
return topK;
@@ -215,48 +172,27 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append('[');
- for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
- Bucket b = bNode.getValue();
- sb.append('{');
+ for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) {
+ Counter<T> b = bNode.getValue();
+ sb.append(b.item);
+ sb.append(':');
sb.append(b.count);
- sb.append(":[");
- for (Counter<T> c : b.counterList) {
- sb.append('{');
- sb.append(c.item);
- sb.append(':');
- sb.append(c.error);
- sb.append("},");
- }
- if (b.counterList.size() > 0) {
- sb.deleteCharAt(sb.length() - 1);
- }
- sb.append("]},");
- }
- if (bucketList.size() > 0) {
- sb.deleteCharAt(sb.length() - 1);
}
sb.append(']');
return sb.toString();
}
- public void fromExternal(int size, double[] counters, List<T> items) {
- this.bucketList = new DoublyLinkedList<Bucket>();
-
- this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
-
- Bucket currentBucket = null;
- ListNode2<Bucket> currentBucketNode = null;
- for (int i = 0; i < size; i++) {
- Counter<T> c = new Counter<T>();
- c.count = counters[i];
- c.item = items.get(i);
- if (currentBucket == null || c.count != currentBucket.count) {
- currentBucket = new Bucket(c.count);
- currentBucketNode = bucketList.add(currentBucket);
- }
- c.bucketNode = currentBucketNode;
- counterMap.put(c.item, currentBucket.counterList.add(c));
- }
+ /**
+ * Put element to the head position;
+ * The consumer should call this method with count in ascending way; the item will be directly put to the head of the list, without comparison for best performance;
+ * @param item
+ * @param count
+ */
+ public void offerToHead(T item, double count) {
+ Counter<T> c = new Counter<T>(item);
+ c.count = count;
+ ListNode2<Counter<T>> node = counterList.add(c);
+ counterMap.put(c.item, node);
}
/**
@@ -273,33 +209,29 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
public TopNCounter<T> merge(TopNCounter<T> another) {
double m1 = 0.0, m2 = 0.0;
if (this.size() >= this.capacity) {
- m1 = this.bucketList.tail().getValue().count;
+ m1 = this.counterList.tail().getValue().count;
}
if (another.size() >= another.capacity) {
- m2 = another.bucketList.tail().getValue().count;
+ m2 = another.counterList.tail().getValue().count;
}
-
+
for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
T item = entry.getKey();
ListNode2<Counter<T>> existing = another.counterMap.get(item);
if (existing != null) {
this.offer(item, another.counterMap.get(item).getValue().count);
- this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + another.counterMap.get(item).getValue().error;
another.counterMap.remove(item);
} else {
this.offer(item, m2);
- this.counterMap.get(item).getValue().error = entry.getValue().getValue().error + m2;
}
}
for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
T item = entry.getKey();
double counter = entry.getValue().getValue().count;
- double error = entry.getValue().getValue().error;
this.offer(item, counter + m1);
- this.counterMap.get(item).getValue().error = error + m1;
}
return this;
@@ -313,17 +245,13 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
assert newCapacity > 0;
this.capacity = newCapacity;
if (newCapacity < this.size()) {
- ListNode2<Bucket> tail = bucketList.tail;
+ ListNode2<Counter<T>> tail = counterList.tail();
while (tail != null && this.size() > newCapacity) {
- Bucket bucket = tail.getValue();
-
- for (Counter<T> counter : bucket.counterList) {
- this.counterMap.remove(counter.getItem());
- }
- tail = tail.getNext();
+ Counter<T> bucket = tail.getValue();
+ this.counterMap.remove(bucket.getItem());
+ this.counterList.remove(tail);
+ tail = this.counterList.tail();
}
-
- tail.next = null;
}
}
@@ -336,14 +264,13 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
double[] counters = new double[size()];
int index = 0;
- for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
- Bucket b = bNode.getValue();
- for (Counter<T> c : b.counterList) {
- counters[index] = c.count;
- index ++;
- }
+ for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) {
+ Counter<T> b = bNode.getValue();
+ counters[index] = b.count;
+ index++;
}
+ assert index == size();
return counters;
}
@@ -353,13 +280,12 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
*/
public List<T> getItems() {
List<T> items = Lists.newArrayList();
- for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
- Bucket b = bNode.getValue();
- for (Counter<T> c : b.counterList) {
- items.add(c.item);
- }
+ for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) {
+ Counter<T> b = bNode.getValue();
+ items.add(b.item);
}
+ assert items.size() == this.size();
return items;
}
@@ -368,46 +294,34 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
public Iterator<Counter<T>> iterator() {
return new TopNCounterIterator();
}
-
+
+ /**
+ * Iterator from the tail (smallest) to head (biggest);
+ */
private class TopNCounterIterator implements Iterator {
- private ListNode2<Bucket> currentBNode;
- private Iterator<Counter<T>> currentCounterIterator;
-
+ private ListNode2<Counter<T>> currentBNode;
+
private TopNCounterIterator() {
- currentBNode = bucketList.head();
- if (currentBNode != null && currentBNode.getValue() != null) {
- currentCounterIterator = currentBNode.getValue().counterList.iterator();
- }
+ currentBNode = counterList.tail();
}
-
+
@Override
public boolean hasNext() {
- if (currentCounterIterator == null) {
- return false;
- }
-
- if (currentCounterIterator.hasNext()) {
- return true;
- }
+ return currentBNode != null;
- currentBNode = currentBNode.getPrev();
-
- if (currentBNode == null)
- return false;
-
- currentCounterIterator = currentBNode.getValue().counterList.iterator();
- return hasNext();
}
@Override
public Counter<T> next() {
- return currentCounterIterator.next();
+ Counter<T> counter = currentBNode.getValue();
+ currentBNode = currentBNode.getNext();
+ return counter;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index 252e955..771df58 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -37,7 +37,7 @@ public class TopNCounterBasicTest {
List<Counter<String>> topk = vs.topK(6);
for (Counter<String> top : topk) {
- System.out.println(top.getItem() + ":" + top.getCount() + ":" + top.getError());
+ System.out.println(top.getItem() + ":" + top.getCount());
}
}
@@ -110,7 +110,7 @@ public class TopNCounterBasicTest {
public void testMerge() {
TopNCounter<String> vs = new TopNCounter<String>(10);
- String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B" };
+ String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B", "A" };
for (String i : stream) {
vs.offer(i);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
index a866289..d2d93b8 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterCombinationTest.java
@@ -29,7 +29,6 @@ public class TopNCounterCombinationTest extends TopNCounterTest {
@Parameterized.Parameters
public static Collection<Integer[]> configs() {
- // return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, });
return Arrays.asList(new Integer[][] {
// with 20X space
{ 10, 20 }, // top 10%
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
index dab45b1..a6aa610 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java
@@ -165,7 +165,7 @@ public class TopNCounterTest {
if (consumers.length == 1)
return consumers;
- TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM * PARALLEL);
+ TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
for (int i=0, n=consumers.length; i<n; i++) {
merged.vs.merge(consumers[i].vs);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
index fde179a..4b35a7a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
@@ -38,8 +38,8 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
@Override
public void aggregate(TopNCounter<ByteArray> value) {
if (sum == null) {
- sum = new TopNCounter<ByteArray>(Integer.MAX_VALUE);
capacity = value.getCapacity();
+ sum = new TopNCounter<ByteArray>(capacity);
}
sum.merge(value);
@@ -48,13 +48,13 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
@Override
public TopNCounter<ByteArray> getState() {
- sum.retain(capacity);
+ //sum.retain(capacity);
return sum;
}
@Override
public int getMemBytesEstimate() {
- return 8 * capacity;
+ return 8 * capacity / 4;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
index d4472f2..e0f293b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
@@ -18,8 +18,7 @@
package org.apache.kylin.metadata.measure.serializer;
-import com.google.common.collect.Lists;
-
+import org.apache.kylin.common.topn.Counter;
import org.apache.kylin.common.topn.DoubleDeltaSerializer;
import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.ByteArray;
@@ -27,6 +26,7 @@ import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.model.DataType;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import java.util.List;
/**
@@ -83,15 +83,15 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
@Override
public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
double[] counters = value.getCounters();
- List<ByteArray> items = value.getItems();
- int keyLength = items.size() > 0 ? items.get(0).length() : 0;
+ List<ByteArray> peek = value.peek(1);
+ int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
out.putInt(value.getCapacity());
out.putInt(value.size());
out.putInt(keyLength);
dds.serialize(counters, out);
-
- for (ByteArray item : items) {
- out.put(item.array());
+ Iterator<Counter<ByteArray>> iterator = value.iterator();
+ while (iterator.hasNext()) {
+ out.put(iterator.next().getItem().array());
}
}
@@ -101,16 +101,15 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
int size = in.getInt();
int keyLength = in.getInt();
double[] counters = dds.deserialize(in);
- List<ByteArray> items = Lists.newArrayList();
+ TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
+ ByteArray byteArray;
for (int i = 0; i < size; i++) {
- ByteArray byteArray = new ByteArray(keyLength);
+ byteArray = new ByteArray(keyLength);
in.get(byteArray.array());
- items.add(byteArray);
+ counter.offerToHead(byteArray, counters[i]);
}
- TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
- counter.fromExternal(size, counters, items);
return counter;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/examples/test_case_data/localmeta/data/data_gen_config.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/data/data_gen_config.json b/examples/test_case_data/localmeta/data/data_gen_config.json
index 2da72b7..ff3f676 100644
--- a/examples/test_case_data/localmeta/data/data_gen_config.json
+++ b/examples/test_case_data/localmeta/data/data_gen_config.json
@@ -26,6 +26,14 @@
"2000000"
],
"asRange": true
+ },
+ {
+ "columnName": "PRICE",
+ "valueSet": [
+ "0",
+ "1000"
+ ],
+ "asRange": true
}
]
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 03df875..35e2927 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -33,11 +33,11 @@ kylin.job.remote.cli.working.dir=/tmp/kylin
# Max count of concurrent jobs running
kylin.job.concurrent.max.limit=10
-# Whether calculate cube in mem in each mapper;
-kylin.job.cubing.inMem=true
+# Max reducer number
+kylin.job.mapreduce.max.reducer.number=5
-#the percentage of the sampling, default 25%
-kylin.job.cubing.inMem.sampling.percent=25
+#the percentage of the sampling, default 100%
+kylin.job.cubing.inMem.sampling.percent=100
# The cut size for hbase region, in GB.
# E.g, for cube whose capacity be marked as "SMALL", split region per 10GB by default
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 10f4661..e7c4cf5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -370,6 +370,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
htable.close();
int reducerNum = regions * 3;
+ reducerNum = Math.max(1, reducerNum);
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
reducerNum = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), reducerNum);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1b2141d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 490c580..88cb7de 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -101,7 +101,7 @@ public class StorageCleanupJob extends AbstractHadoopJob {
}
}
- private void cleanUnusedHBaseTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+ private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -117,6 +117,8 @@ public class StorageCleanupJob extends AbstractHadoopJob {
//only take care htables that belongs to self, and created more than 2 days
if (StringUtils.isEmpty(creationTime) || (System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) {
allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
+ } else {
+ logger.info("Exclude table " + desc.getTableName().getNameAsString() + " from drop list, as it is newly created");
}
}
}
[2/2] incubator-kylin git commit: KYLIN-1068 Optimize the memory
footprint for TopN counter
Posted by sh...@apache.org.
KYLIN-1068 Optimize the memory footprint for TopN counter
KYLIN-1068 update topN merge method, not change the to-be merged object
KYLIN-1068 add sanity check for TopNCounter
KYLIN-1068 sanityCheck for TopN
KYLIN-1068 fix a concurrent modification bug
KYLIN-1068 add debug info
KYLIN-1068 disable TopNcounter basic test
KYLIN-1068 add debug
KYLIN-1068 add debug
KYLIN-1068 add check in TopNAggregator and DoubleDeltaSerializer
KYLIN-1068 fix concurrent bug in DoubleDeltaSerializer and cleanup debug
KYLIN-1068 reuse array in DoulbeDeltaSerializer to avoid OOM error
KYLIN-1068 increase reuseDelta size automatically
KYLIN-1068 remove the sanity check in TopNAggregator
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d0e0d9fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d0e0d9fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d0e0d9fc
Branch: refs/heads/2.x-staging
Commit: d0e0d9fc9e7c6377dcb2a0d9ae71cabcfa98cdcc
Parents: d1b2141
Author: shaofengshi <sh...@apache.org>
Authored: Mon Oct 12 22:05:32 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Oct 26 13:35:24 2015 +0800
----------------------------------------------------------------------
.../kylin/job/BuildCubeWithEngineTest.java | 14 +---
build/script/compress.sh | 19 ++---
.../common/topn/DoubleDeltaSerializer.java | 22 ++++--
.../kylin/common/topn/DoublyLinkedList.java | 11 +--
.../apache/kylin/common/topn/TopNCounter.java | 83 +++++++++++---------
.../kylin/common/topn/TopNCounterBasicTest.java | 1 +
.../cube/inmemcubing/InMemCubeBuilder.java | 33 +++++---
.../kylin/gridtable/GTAggregateScanner.java | 2 +-
.../kylin/metadata/measure/TopNAggregator.java | 10 ++-
.../measure/serializer/DataTypeSerializer.java | 2 +
.../serializer/TopNCounterSerializer.java | 2 +-
.../java/org/apache/kylin/rest/DebugTomcat.java | 2 +-
12 files changed, 113 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 26ad960..af3dc43 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -196,8 +196,8 @@ public class BuildCubeWithEngineTest {
long date2 = f.parse("2013-01-01").getTime();
long date3 = f.parse("2022-01-01").getTime();
List<String> result = Lists.newArrayList();
- result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date3));
-// result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
+ result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2));
+ result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
return result;
}
@@ -230,7 +230,6 @@ public class BuildCubeWithEngineTest {
// this cube's start date is 0, end date is 20120601000000
long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
- /*
long dateEnd = f.parse("2012-06-01").getTime();
clearSegment(cubeName);
@@ -238,20 +237,15 @@ public class BuildCubeWithEngineTest {
// then submit an append job, start date is 20120601000000, end
// date is 20220101000000
- dateStart = f.parse("2012-06-01").getTime();
+ dateStart = dateEnd;
dateEnd = f.parse("2022-01-01").getTime();
result.add(buildSegment(cubeName, dateStart, dateEnd));
// build an empty segment which doesn't have data
- dateStart = f.parse("2022-01-01").getTime();
+ dateStart = dateEnd;
dateEnd = f.parse("2023-01-01").getTime();
result.add(buildSegment(cubeName, dateStart, dateEnd));
-*/
- long dateEnd = f.parse("2023-01-01").getTime();
-
- clearSegment(cubeName);
- result.add(buildSegment(cubeName, dateStart, dateEnd));
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/build/script/compress.sh
----------------------------------------------------------------------
diff --git a/build/script/compress.sh b/build/script/compress.sh
index c70e567..cfd3c18 100755
--- a/build/script/compress.sh
+++ b/build/script/compress.sh
@@ -13,16 +13,17 @@ fi
#package tar.gz
echo 'package tar.gz'
+package_name=apache-kylin-${version}-bin
cd build/
-rm -rf kylin-${version}
-mkdir kylin-${version}
-cp -r lib bin conf tomcat ../examples/sample_cube commit_SHA1 kylin-${version}
+rm -rf ${package_name}
+mkdir ${package_name}
+cp -r lib bin conf tomcat ../examples/sample_cube commit_SHA1 ${package_name}
rm -rf lib tomcat commit_SHA1
-find kylin-${version} -type d -exec chmod 755 {} \;
-find kylin-${version} -type f -exec chmod 644 {} \;
-find kylin-${version} -type f -name "*.sh" -exec chmod 755 {} \;
+find ${package_name} -type d -exec chmod 755 {} \;
+find ${package_name} -type f -exec chmod 644 {} \;
+find ${package_name} -type f -name "*.sh" -exec chmod 755 {} \;
mkdir -p ../dist
-tar -cvzf ../dist/kylin-${version}.tar.gz kylin-${version}
-rm -rf kylin-${version}
+tar -cvzf ../dist/${package_name}.tar.gz ${package_name}
+rm -rf ${package_name}
-echo "Package ready: dist/kylin-${version}.tar.gz"
+echo "Package ready: dist/${package_name}.tar.gz"
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-common/src/main/java/org/apache/kylin/common/topn/DoubleDeltaSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoubleDeltaSerializer.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoubleDeltaSerializer.java
index cd798e0..52dee77 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/DoubleDeltaSerializer.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoubleDeltaSerializer.java
@@ -41,7 +41,8 @@ public class DoubleDeltaSerializer {
final private int precision;
final private int multiplier;
- transient long[] reuseDeltas;
+
+ transient ThreadLocal<long[]> deltasThreadLocal;
public DoubleDeltaSerializer() {
this(2);
@@ -55,6 +56,7 @@ public class DoubleDeltaSerializer {
this.precision = precision;
this.multiplier = (int) Math.pow(10, precision);
+ this.deltasThreadLocal = new ThreadLocal<long[]>();
}
public void serialize(double values[], ByteBuffer buf) {
@@ -110,21 +112,24 @@ public class DoubleDeltaSerializer {
private long[] calculateDeltas(double[] values) {
int len = values.length - 1;
len = Math.max(0, len);
- if (reuseDeltas == null || reuseDeltas.length < len) {
- reuseDeltas = new long[len];
+
+ long[] deltas = deltasThreadLocal.get();
+ if (deltas == null || deltas.length < len) {
+ deltas = new long[len];
+ deltasThreadLocal.set(deltas);
}
-
+
if (len == 0)
- return reuseDeltas;
+ return deltas;
long current = roundAndPromote(values[0]);
for (int i = 0; i < len; i++) {
long next = roundAndPromote(values[i + 1]);
- reuseDeltas[i] = next - current;
- assert reuseDeltas[i] >= 0;
+ deltas[i] = next - current;
+ assert deltas[i] >= 0;
current = next;
}
- return reuseDeltas;
+ return deltas;
}
private long roundAndPromote(double value) {
@@ -142,6 +147,7 @@ public class DoubleDeltaSerializer {
double[] result = new double[len];
deserialize(buf, meta, result);
+
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
index d268a30..1520ce1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -28,7 +28,7 @@ import java.util.Iterator;
*/
public class DoublyLinkedList<T> {
- private int size;
+ private int size = 0;
private ListNode2<T> tail;
private ListNode2<T> head;
@@ -37,14 +37,7 @@ public class DoublyLinkedList<T> {
*/
public ListNode2<T> add(T value) {
ListNode2<T> node = new ListNode2<T>(value);
- if (size++ == 0) {
- tail = node;
- } else {
- node.prev = head;
- head.next = node;
- }
-
- head = node;
+ add(node);
return node;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 1856010..e6c5c90 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -19,6 +19,7 @@
package org.apache.kylin.common.topn;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.kylin.common.util.Pair;
import java.util.*;
@@ -116,20 +117,40 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
Counter<T> counter = counterNode.getValue();
counter.count += incrementCount;
- ListNode2<Counter<T>> nodeNext = counterNode.getNext();
+ ListNode2<Counter<T>> nodeNext;
+
+ if (incrementCount > 0) {
+ nodeNext = counterNode.getNext();
+ } else {
+ nodeNext = counterNode.getPrev();
+ }
counterList.remove(counterNode);
counterNode.prev = null;
counterNode.next = null;
- while (nodeNext != null && counter.count >= nodeNext.getValue().count) {
- nodeNext = nodeNext.getNext();
- }
- if (nodeNext != null) {
- counterList.addBefore(nodeNext, counterNode);
+ if (incrementCount > 0) {
+ while (nodeNext != null && counter.count >= nodeNext.getValue().count) {
+ nodeNext = nodeNext.getNext();
+ }
+ if (nodeNext != null) {
+ counterList.addBefore(nodeNext, counterNode);
+ } else {
+ counterList.add(counterNode);
+ }
+
} else {
- counterList.add(counterNode);
+ while (nodeNext != null && counter.count < nodeNext.getValue().count) {
+ nodeNext = nodeNext.getPrev();
+ }
+ if (nodeNext != null) {
+ counterList.addAfter(nodeNext, counterNode);
+ } else {
+ counterList.enqueue(counterNode);
+ }
}
+
+
}
@Override
@@ -196,13 +217,7 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
}
/**
- * For de-serialization
- */
- public TopNCounter() {
- }
-
- /**
- * Merge another counter into this counter; Note, the other counter will be changed in this method; please make a copy and passed in here;
+ * Merge another counter into this counter;
* @param another
* @return
*/
@@ -216,22 +231,33 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
m2 = another.counterList.tail().getValue().count;
}
+ Set<T> duplicateItems = Sets.newHashSet();
+ List<T> notDuplicateItems = Lists.newArrayList();
+
for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) {
T item = entry.getKey();
ListNode2<Counter<T>> existing = another.counterMap.get(item);
if (existing != null) {
- this.offer(item, another.counterMap.get(item).getValue().count);
-
- another.counterMap.remove(item);
+ duplicateItems.add(item);
} else {
- this.offer(item, m2);
+ notDuplicateItems.add(item);
}
}
+ for(T item : duplicateItems) {
+ this.offer(item, another.counterMap.get(item).getValue().count);
+ }
+
+ for(T item : notDuplicateItems) {
+ this.offer(item, m2);
+ }
+
for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) {
T item = entry.getKey();
- double counter = entry.getValue().getValue().count;
- this.offer(item, counter + m1);
+ if (duplicateItems.contains(item) == false) {
+ double counter = entry.getValue().getValue().count;
+ this.offer(item, counter + m1);
+ }
}
return this;
@@ -274,22 +300,6 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
return counters;
}
- /**
- * Get the item list order by counter values in ascending order
- * @return
- */
- public List<T> getItems() {
- List<T> items = Lists.newArrayList();
- for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) {
- Counter<T> b = bNode.getValue();
- items.add(b.item);
- }
-
- assert items.size() == this.size();
- return items;
-
- }
-
@Override
public Iterator<Counter<T>> iterator() {
return new TopNCounterIterator();
@@ -324,4 +334,5 @@ public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
throw new UnsupportedOperationException();
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
index 771df58..c3e941b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterBasicTest.java
@@ -16,6 +16,7 @@
package org.apache.kylin.common.topn;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.Arrays;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 1666fc2..7d943a3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -30,10 +30,9 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.*;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.gridtable.CubeGridTable;
@@ -463,10 +462,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
mask = mask >> 1;
}
- return scanAndAggregateGridTable(parent.table, cuboidId, childDimensions, measureColumns);
+ return scanAndAggregateGridTable(parent.table, parent.cuboidId, cuboidId, childDimensions, measureColumns);
}
- private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
+ private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
long startTime = System.currentTimeMillis();
logger.info("Calculating cuboid " + cuboidId);
@@ -489,10 +488,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
builder.write(newRecord);
}
- // disable sanity check for performance
- long t = System.currentTimeMillis();
- sanityCheck(scanner.getTotalSumForSanityCheck());
- logger.info("sanity check for Cuboid " + cuboidId + " cost " + (System.currentTimeMillis() - t) + "ms");
+ //long t = System.currentTimeMillis();
+ //sanityCheck(parentId, cuboidId, scanner.getTotalSumForSanityCheck());
+ //logger.info("sanity check for Cuboid " + cuboidId + " cost " + (System.currentTimeMillis() - t) + "ms");
} finally {
scanner.close();
builder.close();
@@ -505,12 +503,22 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
//@SuppressWarnings("unused")
- private void sanityCheck(Object[] totalSum) {
+ private void sanityCheck(long parentId, long cuboidId, Object[] totalSum) {
// double sum introduces error and causes result not exactly equal
for (int i = 0; i < totalSum.length; i++) {
if (totalSum[i] instanceof DoubleMutable) {
totalSum[i] = Math.round(((DoubleMutable) totalSum[i]).get());
+ } else if (totalSum[i] instanceof TopNCounter) {
+ TopNCounter counter = (TopNCounter) totalSum[i];
+ Iterator<Counter> iterator = counter.iterator();
+ double total = 0.0;
+ while (iterator.hasNext()) {
+ Counter aCounter = iterator.next();
+ total += aCounter.getCount();
+ }
+ totalSum[i] = Math.round(total);
}
+
}
if (totalSumForSanityCheck == null) {
@@ -518,6 +526,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
return;
}
if (Arrays.equals(totalSumForSanityCheck, totalSum) == false) {
+ logger.info("sanityCheck failed when calculate " + cuboidId + " from parent " + parentId);
+ logger.info("Expected: " + Arrays.toString(totalSumForSanityCheck));
+ logger.info("Actually: " + Arrays.toString(totalSum));
throw new IllegalStateException();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 9050c49..193a05c 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -167,7 +167,7 @@ public class GTAggregateScanner implements IGTScanner {
// skip expensive aggregation
for (int i = 0; i < totalSum.length; i++) {
- if (totalSum[i] instanceof HLLCAggregator || totalSum[i] instanceof LDCAggregator || totalSum[i] instanceof TopNAggregator )
+ if (totalSum[i] instanceof HLLCAggregator || totalSum[i] instanceof LDCAggregator || totalSum[i] instanceof TopNAggregator)
totalSum[i] = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
index 4b35a7a..9c21f7b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/TopNAggregator.java
@@ -18,9 +18,14 @@
package org.apache.kylin.metadata.measure;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.topn.Counter;
import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.ByteArray;
+import java.util.Iterator;
+import java.util.Map;
+
/**
*
*/
@@ -29,6 +34,7 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
int capacity = 0;
TopNCounter<ByteArray> sum = null;
+ Map<ByteArray, Double> sanityCheckMap;
@Override
public void reset() {
@@ -39,9 +45,9 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
public void aggregate(TopNCounter<ByteArray> value) {
if (sum == null) {
capacity = value.getCapacity();
- sum = new TopNCounter<ByteArray>(capacity);
+ sum = new TopNCounter<>(capacity);
+ sanityCheckMap = Maps.newHashMap();
}
-
sum.merge(value);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
index d5dc43e..d542098 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
@@ -32,6 +32,8 @@ import com.google.common.collect.Maps;
/**
* @author yangli9
*
+ * Note: the implementations MUST be thread-safe.
+ *
*/
abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
index e0f293b..468d077 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java
@@ -34,7 +34,7 @@ import java.util.List;
*/
public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
- private DoubleDeltaSerializer dds = new DoubleDeltaSerializer();
+ private DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3);
private int precision;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d0e0d9fc/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
index 18730ef..0b7aa70 100644
--- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
+++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
@@ -109,7 +109,7 @@ public class DebugTomcat {
String webBase = new File("../webapp/app").getAbsolutePath();
if (new File(webBase, "WEB-INF").exists() == false) {
- throw new RuntimeException("In order to launch Kylin web app from IDE, please copy server/src/main/webapp/WEB-INF to webapp/app/WEB-INF");
+ throw new RuntimeException("In order to launch Kylin web app from IDE, please copy server/src/main/webapp/WEB-INF to webapp/app/");
}
Tomcat tomcat = new Tomcat();