You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/06/30 04:33:30 UTC

incubator-kylin git commit: KYLIN-839 Optimize Snapshot table memory usage

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 194f2df3f -> 37b9480b2


KYLIN-839 Optimize Snapshot table memory usage

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/37b9480b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/37b9480b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/37b9480b

Branch: refs/heads/0.8
Commit: 37b9480b2a371b341ebbc0c7f1887a50be4f0f84
Parents: 194f2df
Author: shaofengshi <sh...@apache.org>
Authored: Tue Jun 30 10:33:17 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Jun 30 10:33:17 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/dict/lookup/SnapshotTable.java | 150 ++++++++++++++-----
 1 file changed, 109 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/37b9480b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index 4a715b4..822a9f4 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,24 +18,27 @@
 
 package org.apache.kylin.dict.lookup;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.fs.Path;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionary;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
 import org.apache.kylin.metadata.model.TableDesc;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * @author yangli9
  */
@@ -46,8 +49,11 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
     private TableSignature signature;
     @JsonProperty("column_delimeter")
     private String columnDelimeter;
+    @JsonProperty("useDictionary")
+    private boolean useDictionary;
 
-    private ArrayList<String[]> rows;
+    private ArrayList<int[]> rowIndices;
+    private Dictionary<String> dict;
 
     // default constructor for JSON serialization
     public SnapshotTable() {
@@ -56,6 +62,7 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
     SnapshotTable(ReadableTable table) throws IOException {
         this.signature = table.getSignature();
         this.columnDelimeter = table.getColumnDelimeter();
+        this.useDictionary = true;
     }
 
     public void takeSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException {
@@ -64,16 +71,33 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
 
         int maxIndex = tableDesc.getMaxColumnIndex();
 
+        TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter());
+
         TableReader reader = table.getReader();
-        ArrayList<String[]> allRows = new ArrayList<String[]>();
         while (reader.next()) {
             String[] row = reader.getRow();
             if (row.length <= maxIndex) {
                 throw new IllegalStateException("Bad hive table row, " + tableDesc + " expect " + (maxIndex + 1) + " columns, but got " + Arrays.toString(row));
             }
-            allRows.add(row);
+
+            for (String cell : row) {
+                b.addValue(cell);
+            }
+        }
+
+        this.dict = b.build(0);
+
+        reader = table.getReader();
+        ArrayList<int[]> allRowIndices = new ArrayList<int[]>();
+        while (reader.next()) {
+            String[] row = reader.getRow();
+            int[] rowIndex = new int[row.length];
+            for (int i = 0; i < row.length; i++) {
+                rowIndex[i] = dict.getIdFromValue(row[i]);
+            }
+            allRowIndices.add(rowIndex);
         }
-        this.rows = allRows;
+        this.rowIndices = allRowIndices;
     }
 
     public String getResourcePath() {
@@ -93,12 +117,17 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
             @Override
             public boolean next() throws IOException {
                 i++;
-                return i < rows.size();
+                return i < rowIndices.size();
             }
 
             @Override
             public String[] getRow() {
-                return rows.get(i);
+                int[] rowIndex = rowIndices.get(i);
+                String[] row = new String[rowIndex.length];
+                for (int x = 0; x < row.length; x++) {
+                    row[x] = dict.getValueFromId(rowIndex[x]);
+                }
+                return row;
             }
 
             @Override
@@ -129,9 +158,9 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
      */
     @Override
     public int hashCode() {
-        int[] parts = new int[this.rows.size()];
+        int[] parts = new int[this.rowIndices.size()];
         for (int i = 0; i < parts.length; ++i)
-            parts[i] = Arrays.hashCode(this.rows.get(i));
+            parts[i] = Arrays.hashCode(this.rowIndices.get(i));
         return Arrays.hashCode(parts);
     }
 
@@ -142,10 +171,10 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
         SnapshotTable that = (SnapshotTable) o;
 
         //compare row by row
-        if (this.rows.size() != that.rows.size())
+        if (this.rowIndices.size() != that.rowIndices.size())
             return false;
-        for (int i = 0; i < this.rows.size(); ++i) {
-            if (!ArrayUtils.isEquals(this.rows.get(i), that.rows.get(i)))
+        for (int i = 0; i < this.rowIndices.size(); ++i) {
+            if (!ArrayUtils.isEquals(this.rowIndices.get(i), that.rowIndices.get(i)))
                 return false;
         }
         return true;
@@ -162,15 +191,27 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
     }
 
     void writeData(DataOutput out) throws IOException {
-        out.writeInt(rows.size());
-        if (rows.size() > 0) {
-            int n = rows.get(0).length;
+        out.writeInt(rowIndices.size());
+        if (rowIndices.size() > 0) {
+            int n = rowIndices.get(0).length;
             out.writeInt(n);
-            for (int i = 0; i < rows.size(); i++) {
-                String[] row = rows.get(i);
-                for (int j = 0; j < n; j++) {
-                    // NULL_STR is tricky, but we don't want to break the current snapshots
-                    out.writeUTF(row[j] == null ? NULL_STR : row[j]);
+
+            if (this.useDictionary == true) {
+                dict.write(out);
+                for (int i = 0; i < rowIndices.size(); i++) {
+                    int[] row = rowIndices.get(i);
+                    for (int j = 0; j < n; j++) {
+                        out.writeInt(row[j]);
+                    }
+                }
+
+            } else {
+                for (int i = 0; i < rowIndices.size(); i++) {
+                    int[] row = rowIndices.get(i);
+                    for (int j = 0; j < n; j++) {
+                        // NULL_STR is tricky, but we don't want to break the current snapshots
+                        out.writeUTF(dict.getValueFromId(row[j]) == null ? NULL_STR : dict.getValueFromId(row[j]));
+                    }
                 }
             }
         }
@@ -178,17 +219,44 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
 
     void readData(DataInput in) throws IOException {
         int rowNum = in.readInt();
-        rows = new ArrayList<String[]>(rowNum);
         if (rowNum > 0) {
             int n = in.readInt();
-            for (int i = 0; i < rowNum; i++) {
-                String[] row = new String[n];
-                rows.add(row);
-                for (int j = 0; j < n; j++) {
-                    row[j] = in.readUTF();
-                    // NULL_STR is tricky, but we don't want to break the current snapshots
-                    if (row[j].equals(NULL_STR))
-                        row[j] = null;
+            rowIndices = new ArrayList<int[]>(rowNum);
+
+            if (this.useDictionary == true) {
+                this.dict = new TrieDictionary<String>();
+                dict.readFields(in);
+
+                for (int i = 0; i < rowNum; i++) {
+                    int[] row = new int[n];
+                    this.rowIndices.add(row);
+                    for (int j = 0; j < n; j++) {
+                        row[j] = in.readInt();
+                    }
+                }
+            } else {
+                List<String[]> rows = new ArrayList<String[]>(rowNum);
+                TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter());
+
+                for (int i = 0; i < rowNum; i++) {
+                    String[] row = new String[n];
+                    rows.add(row);
+                    for (int j = 0; j < n; j++) {
+                        row[j] = in.readUTF();
+                        // NULL_STR is tricky, but we don't want to break the current snapshots
+                        if (row[j].equals(NULL_STR))
+                            row[j] = null;
+                        
+                        b.addValue(row[j]);
+                    }
+                }
+                this.dict = b.build(0);
+                for (String[] row : rows) {
+                    int[] rowIndex = new int[n];
+                    for (int i = 0; i < n; i++) {
+                        rowIndex[i] = dict.getIdFromValue(row[i]);
+                    }
+                    this.rowIndices.add(rowIndex);
                 }
             }
         }