You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/13 07:35:39 UTC

[2/2] incubator-kylin git commit: KYLIN-877 Simply interface ReadableTable, get ready for hive abstraction

KYLIN-877 Simply interface ReadableTable, get ready for hive abstraction


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/988f1369
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/988f1369
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/988f1369

Branch: refs/heads/0.8
Commit: 988f136917d0d489769a2c62126f4ac1e53c8df8
Parents: bbd5e27
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Jul 13 13:33:48 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Jul 13 13:33:48 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/dict/DictionaryGenerator.java  |  18 +--
 .../org/apache/kylin/dict/DictionaryInfo.java   |  19 +---
 .../apache/kylin/dict/DictionaryManager.java    |  26 +++--
 .../org/apache/kylin/dict/lookup/FileTable.java |  15 +--
 .../kylin/dict/lookup/FileTableReader.java      |  13 +--
 .../org/apache/kylin/dict/lookup/HiveTable.java |  10 --
 .../kylin/dict/lookup/HiveTableReader.java      |   8 +-
 .../apache/kylin/dict/lookup/LookupTable.java   |   2 +
 .../apache/kylin/dict/lookup/ReadableTable.java | 106 +++++++++++++++++-
 .../kylin/dict/lookup/SnapshotManager.java      |   2 +-
 .../apache/kylin/dict/lookup/SnapshotTable.java |  19 ----
 .../apache/kylin/dict/lookup/TableReader.java   |  36 ------
 .../kylin/dict/lookup/TableSignature.java       | 111 -------------------
 .../kylin/dict/ITSnapshotManagerTest.java       |   7 +-
 .../org/apache/kylin/dict/TableReaderTest.java  |   5 +-
 .../job/hadoop/cube/FactDistinctColumnsJob.java |  11 +-
 .../kylin/job/streaming/CubeStreamConsumer.java |  43 ++++---
 .../job/hadoop/cube/MergeCuboidMapperTest.java  |  28 +++--
 18 files changed, 188 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 3d79ffa..60b2a8d 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -18,23 +18,27 @@
 
 package org.apache.kylin.dict;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.TableReader;
+import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
 import org.apache.kylin.metadata.model.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index b6109c7..802f6e7 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -21,9 +21,10 @@ package org.apache.kylin.dict;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class DictionaryInfo extends RootPersistentEntity {
@@ -38,8 +39,6 @@ public class DictionaryInfo extends RootPersistentEntity {
     private String dataType;
     @JsonProperty("input")
     private TableSignature input;
-    @JsonProperty("input_delimeter")
-    private String inputDelimeter;
     @JsonProperty("dictionary_class")
     private String dictionaryClass;
     @JsonProperty("cardinality")
@@ -50,7 +49,7 @@ public class DictionaryInfo extends RootPersistentEntity {
     public DictionaryInfo() {
     }
 
-    public DictionaryInfo(String sourceTable, String sourceColumn, int sourceColumnIndex, String dataType, TableSignature input, String inputDelimeter) {
+    public DictionaryInfo(String sourceTable, String sourceColumn, int sourceColumnIndex, String dataType, TableSignature input) {
 
         this.updateRandomUuid();
 
@@ -59,7 +58,6 @@ public class DictionaryInfo extends RootPersistentEntity {
         this.sourceColumnIndex = sourceColumnIndex;
         this.dataType = dataType;
         this.input = input;
-        this.inputDelimeter = inputDelimeter;
     }
 
     public DictionaryInfo(DictionaryInfo other) {
@@ -71,7 +69,6 @@ public class DictionaryInfo extends RootPersistentEntity {
         this.sourceColumnIndex = other.sourceColumnIndex;
         this.dataType = other.dataType;
         this.input = other.input;
-        this.inputDelimeter = other.inputDelimeter;
     }
 
     // ----------------------------------------------------------------------------
@@ -89,7 +86,7 @@ public class DictionaryInfo extends RootPersistentEntity {
     // to decide if two dictionaries are built on the same table/column,
     // regardless of their signature
     public boolean isDictOnSameColumn(DictionaryInfo other) {
-        return this.sourceTable.equalsIgnoreCase(other.sourceTable) && this.sourceColumn.equalsIgnoreCase(other.sourceColumn) && this.sourceColumnIndex == other.sourceColumnIndex && this.dataType.equalsIgnoreCase(other.dataType) && this.inputDelimeter.equalsIgnoreCase(other.inputDelimeter) && this.dictionaryClass.equalsIgnoreCase(other.dictionaryClass);
+        return this.sourceTable.equalsIgnoreCase(other.sourceTable) && this.sourceColumn.equalsIgnoreCase(other.sourceColumn) && this.sourceColumnIndex == other.sourceColumnIndex && this.dataType.equalsIgnoreCase(other.dataType) && this.dictionaryClass.equalsIgnoreCase(other.dictionaryClass);
     }
 
     public String getSourceTable() {
@@ -132,14 +129,6 @@ public class DictionaryInfo extends RootPersistentEntity {
         this.input = input;
     }
 
-    public String getInputDelimeter() {
-        return inputDelimeter;
-    }
-
-    public void setInputDelimeter(String inputDelimeter) {
-        this.inputDelimeter = inputDelimeter;
-    }
-
     public String getDictionaryClass() {
         return dictionaryClass;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 0ba1ed9..2e1e3db 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -18,7 +18,15 @@
 
 package org.apache.kylin.dict;
 
-import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.commons.compress.utils.IOUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -29,7 +37,7 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.dict.lookup.FileTable;
 import org.apache.kylin.dict.lookup.HiveTable;
 import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.DataType;
@@ -37,14 +45,7 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Lists;
 
 public class DictionaryManager {
 
@@ -204,10 +205,11 @@ public class DictionaryManager {
         int srcColIdx = (Integer) tmp[2];
         ReadableTable inpTable = (ReadableTable) tmp[3];
 
-        if (!inpTable.exists())
+        TableSignature inputSig = inpTable.getSignature();
+        if (inputSig == null) // table does not exists
             return null;
 
-        DictionaryInfo dictInfo = new DictionaryInfo(srcTable, srcCol, srcColIdx, col.getDatatype(), inpTable.getSignature(), inpTable.getColumnDelimeter());
+        DictionaryInfo dictInfo = new DictionaryInfo(srcTable, srcCol, srcColIdx, col.getDatatype(), inputSig);
 
         String dupDict = checkDupByInfo(dictInfo);
         if (dupDict != null) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
index 0677860..139761e 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
@@ -23,15 +23,15 @@ import java.io.IOException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.kylin.common.util.HadoopUtil;
 
 /**
- * @author yangli9
- * 
  */
 public class FileTable implements ReadableTable {
 
+    public static final String DELIM_AUTO = "auto";
+    public static final String DELIM_COMMA = ",";
+
     String path;
     String delim;
     int nColumns;
@@ -46,26 +46,17 @@ public class FileTable implements ReadableTable {
         this.nColumns = nColumns;
     }
 
-    @Override
     public String getColumnDelimeter() {
         return delim;
     }
 
     @Override
-    public boolean exists() throws IOException {
-        FileSystem fs = HadoopUtil.getFileSystem(path);
-        return fs.exists(new Path(path));
-    }
-
-    @Override
     public TableReader getReader() throws IOException {
         return new FileTableReader(path, delim, nColumns);
     }
 
     @Override
     public TableSignature getSignature() throws IOException {
-        if (!exists())
-            throw new IllegalStateException("Table not exists");
         FileSystem fs = HadoopUtil.getFileSystem(path);
         FileStatus status = fs.getFileStatus(new Path(path));
         return new TableSignature(path, status.getLen(), status.getModificationTime());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
index 3631b25..bf46963 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
@@ -37,9 +37,9 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
 
 /**
  * Tables are typically CSV or SEQ file.
@@ -61,7 +61,7 @@ public class FileTableReader implements TableReader {
     private int expectedColumnNumber = -1; // helps delimiter detection
 
     public FileTableReader(String filePath, int expectedColumnNumber) throws IOException {
-        this(filePath, ReadableTable.DELIM_AUTO, expectedColumnNumber);
+        this(filePath, FileTable.DELIM_AUTO, expectedColumnNumber);
     }
     
     public FileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException {
@@ -94,11 +94,6 @@ public class FileTableReader implements TableReader {
     }
 
     @Override
-    public void setExpectedColumnNumber(int expectedColumnNumber) {
-        this.expectedColumnNumber = expectedColumnNumber;
-    }
-
-    @Override
     public boolean next() throws IOException {
         curLine = reader.nextLine();
         curColumns = null;
@@ -112,7 +107,7 @@ public class FileTableReader implements TableReader {
     @Override
     public String[] getRow() {
         if (curColumns == null) {
-            if (ReadableTable.DELIM_AUTO.equals(delim))
+            if (FileTable.DELIM_AUTO.equals(delim))
                 delim = autoDetectDelim(curLine);
 
             if (delim == null)
@@ -128,7 +123,7 @@ public class FileTableReader implements TableReader {
         String str[] = StringSplitter.split(line, delim);
 
         // un-escape CSV
-        if (ReadableTable.DELIM_COMMA.equals(delim)) {
+        if (FileTable.DELIM_COMMA.equals(delim)) {
             for (int i = 0; i < str.length; i++) {
                 str[i] = unescapeCsv(str[i]);
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
index c634647..cbd3b04 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
@@ -57,16 +57,6 @@ public class HiveTable implements ReadableTable {
     }
 
     @Override
-    public String getColumnDelimeter() throws IOException {
-        return getFileTable().getColumnDelimeter();
-    }
-
-    @Override
-    public boolean exists() throws IOException {
-        return true;
-    }
-
-    @Override
     public TableReader getReader() throws IOException {
         return new HiveTableReader(database, hiveTable);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
index c2af155..96aa0d1 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
@@ -33,11 +33,10 @@ import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
 import org.apache.hive.hcatalog.data.transfer.HCatReader;
 import org.apache.hive.hcatalog.data.transfer.ReadEntity;
 import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
 
 /**
  * An implementation of TableReader with HCatalog for Hive table.
- * @author shaoshi
- *
  */
 public class HiveTableReader implements TableReader {
 
@@ -135,11 +134,6 @@ public class HiveTableReader implements TableReader {
     }
 
     @Override
-    public void setExpectedColumnNumber(int expectedColumnNumber) {
-
-    }
-
-    @Override
     public void close() throws IOException {
         this.readCntxt = null;
         this.currentHCatRecordItr = null;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index 50d4cbc..7e83197 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -28,7 +28,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.kylin.common.util.Pair;
 
 import com.google.common.collect.Sets;
+
 import org.apache.kylin.common.util.Array;
+import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
 import org.apache.kylin.metadata.model.TableDesc;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
index 3fe6773..2e6af14 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
@@ -18,22 +18,116 @@
 
 package org.apache.kylin.dict.lookup;
 
+import java.io.Closeable;
 import java.io.IOException;
 
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+
 /**
- * @author yangli9
  */
 public interface ReadableTable {
 
-    public static final String DELIM_AUTO = "auto";
-    public static final String DELIM_COMMA = ",";
-
+    /** Returns a reader to read the table. */
     public TableReader getReader() throws IOException;
 
+    /** Used to detect table modifications mainly. Return null in case table does not exist. */
     public TableSignature getSignature() throws IOException;
 
-    public String getColumnDelimeter() throws IOException;
+    public interface TableReader extends Closeable {
+
+        /** Move to the next row, return false if no more record. */
+        public boolean next() throws IOException;
+
+        /** Get the current row. */
+        public String[] getRow();
+        
+    }
+    
+    // ============================================================================
+    
+    @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+    public class TableSignature {
+
+        @JsonProperty("path")
+        private String path;
+        @JsonProperty("size")
+        private long size;
+        @JsonProperty("last_modified_time")
+        private long lastModifiedTime;
+
+        // for JSON serialization
+        public TableSignature() {
+        }
+
+        public TableSignature(String path, long size, long lastModifiedTime) {
+            super();
+            this.path = path;
+            this.size = size;
+            this.lastModifiedTime = lastModifiedTime;
+        }
+
+        public void setPath(String path) {
+            this.path = path;
+        }
+
+        public void setSize(long size) {
+            this.size = size;
+        }
+
+        public void setLastModifiedTime(long lastModifiedTime) {
+            this.lastModifiedTime = lastModifiedTime;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public long getLastModifiedTime() {
+            return lastModifiedTime;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32));
+            result = prime * result + ((path == null) ? 0 : path.hashCode());
+            result = prime * result + (int) (size ^ (size >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            TableSignature other = (TableSignature) obj;
+            if (lastModifiedTime != other.lastModifiedTime)
+                return false;
+            if (path == null) {
+                if (other.path != null)
+                    return false;
+            } else if (!path.equals(other.path))
+                return false;
+            if (size != other.size)
+                return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]";
+        }
 
-    public boolean exists() throws IOException;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index a2f6d5d..3b6db77 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -24,9 +24,9 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index 90cb5dd..aa46212 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -47,8 +47,6 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
 
     @JsonProperty("signature")
     private TableSignature signature;
-    @JsonProperty("column_delimeter")
-    private String columnDelimeter;
     @JsonProperty("useDictionary")
     private boolean useDictionary;
 
@@ -61,13 +59,11 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
 
     SnapshotTable(ReadableTable table) throws IOException {
         this.signature = table.getSignature();
-        this.columnDelimeter = table.getColumnDelimeter();
         this.useDictionary = true;
     }
 
     public void takeSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException {
         this.signature = table.getSignature();
-        this.columnDelimeter = table.getColumnDelimeter();
 
         int maxIndex = tableDesc.getMaxColumnIndex();
 
@@ -134,11 +130,6 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
             @Override
             public void close() throws IOException {
             }
-
-            @Override
-            public void setExpectedColumnNumber(int expectedColumnNumber) {
-                // noop
-            }
         };
     }
 
@@ -147,16 +138,6 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
         return signature;
     }
 
-    @Override
-    public String getColumnDelimeter() throws IOException {
-        return columnDelimeter;
-    }
-
-    @Override
-    public boolean exists() throws IOException {
-        return true;
-    }
-
     /**
      * a naive implementation
      *

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableReader.java
deleted file mode 100644
index e31ff96..0000000
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableReader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Tables are typically CSV or SEQ file.
- * 
- * @author yangli9
- */
-public interface TableReader extends Closeable {
-
-    public boolean next() throws IOException;
-
-    public String[] getRow();
-
-    public void setExpectedColumnNumber(int expectedColumnNumber);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableSignature.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableSignature.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableSignature.java
deleted file mode 100644
index b9ecc71..0000000
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableSignature.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * @author yangli9
- */
-@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class TableSignature {
-
-    @JsonProperty("path")
-    private String path;
-    @JsonProperty("size")
-    private long size;
-    @JsonProperty("last_modified_time")
-    private long lastModifiedTime;
-
-    // for JSON serialization
-    public TableSignature() {
-    }
-
-    public TableSignature(String path, long size, long lastModifiedTime) {
-        super();
-        this.path = path;
-        this.size = size;
-        this.lastModifiedTime = lastModifiedTime;
-    }
-
-    public void setPath(String path) {
-        this.path = path;
-    }
-
-    public void setSize(long size) {
-        this.size = size;
-    }
-
-    public void setLastModifiedTime(long lastModifiedTime) {
-        this.lastModifiedTime = lastModifiedTime;
-    }
-
-    public String getPath() {
-        return path;
-    }
-
-    public long getSize() {
-        return size;
-    }
-
-    public long getLastModifiedTime() {
-        return lastModifiedTime;
-    }
-
-    // ============================================================================
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32));
-        result = prime * result + ((path == null) ? 0 : path.hashCode());
-        result = prime * result + (int) (size ^ (size >>> 32));
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        TableSignature other = (TableSignature) obj;
-        if (lastModifiedTime != other.lastModifiedTime)
-            return false;
-        if (path == null) {
-            if (other.path != null)
-                return false;
-        } else if (!path.equals(other.path))
-            return false;
-        if (size != other.size)
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java b/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
index e59fcda..7fc37a6 100644
--- a/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
+++ b/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
@@ -18,20 +18,19 @@
 
 package org.apache.kylin.dict;
 
+import static org.junit.Assert.*;
+
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.dict.lookup.HiveTable;
+import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.dict.lookup.TableReader;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
 /**
  * @author yangli9
  * 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java b/dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
index 2650aad..cfecaee 100644
--- a/dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
+++ b/dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
@@ -24,11 +24,10 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.kylin.dict.lookup.FileTable;
 import org.apache.kylin.dict.lookup.FileTableReader;
 import org.junit.Test;
 
-import org.apache.kylin.dict.lookup.ReadableTable;
-
 /**
  * @author yangli9
  * 
@@ -38,7 +37,7 @@ public class TableReaderTest {
     @Test
     public void testBasicReader() throws IOException {
         File f = new File("src/test/resources/dict/DW_SITES");
-        FileTableReader reader = new FileTableReader("file://" + f.getAbsolutePath(), ReadableTable.DELIM_AUTO, 10);
+        FileTableReader reader = new FileTableReader("file://" + f.getAbsolutePath(), FileTable.DELIM_AUTO, 10);
         while (reader.next()) {
             assertEquals("[-1, Korea Auction.co.kr, S, 48, 0, 111, 2009-02-11, , DW_OFFPLAT, ]", Arrays.toString(reader.getRow()));
             break;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index da587db..5ec963e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -18,11 +18,10 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
+import java.io.IOException;
+
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -32,19 +31,14 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 /**
  * @author yangli9
  */
@@ -80,7 +74,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             // add metadata to distributed cache
             CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
-            CubeSegment newSegment = cubeInstance.getSegment(segmentName, SegmentStatusEnum.NEW);
 
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index e595089..980b375 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -1,13 +1,20 @@
 package org.apache.kylin.job.streaming;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -36,8 +43,7 @@ import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
 import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
@@ -53,13 +59,14 @@ import org.apache.kylin.streaming.MicroStreamBatchConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 
 /**
  */
@@ -130,7 +137,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
             signature.setLastModifiedTime(System.currentTimeMillis());
             signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
             signature.setSize(endOffset - startOffset);
-            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(), tblColRef.getName(), tblColRef.getColumnDesc().getZeroBasedIndex(), tblColRef.getDatatype(), signature, ReadableTable.DELIM_AUTO);
+            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(), tblColRef.getName(), tblColRef.getColumnDesc().getZeroBasedIndex(), tblColRef.getDatatype(), signature);
             logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
             DictionaryManager dictionaryManager = DictionaryManager.getInstance(kylinConfig);
             try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
index 01e57d5..5886324 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
@@ -18,6 +18,14 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.mapreduce.MapDriver;
@@ -26,8 +34,12 @@ import org.apache.kylin.cube.CubeBuilder;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.dict.*;
-import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.TrieDictionary;
+import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -38,14 +50,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
 /**
  * @author honma
  */
@@ -69,7 +73,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
         signature.setLastModifiedTime(System.currentTimeMillis());
         signature.setPath("fake_common_dict");
 
-        DictionaryInfo newDictInfo = new DictionaryInfo("", "", 0, "string", signature, "");
+        DictionaryInfo newDictInfo = new DictionaryInfo("", "", 0, "string", signature);
 
         List<byte[]> values = new ArrayList<byte[]>();
         values.add(new byte[] { 101, 101, 101 });
@@ -119,7 +123,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
             signature.setLastModifiedTime(System.currentTimeMillis());
             signature.setPath("fake_dict_for" + lfn.getName() + segment.getName());
 
-            DictionaryInfo newDictInfo = new DictionaryInfo(lfn.getTable(), lfn.getColumnDesc().getName(), lfn.getColumnDesc().getZeroBasedIndex(), "string", signature, "");
+            DictionaryInfo newDictInfo = new DictionaryInfo(lfn.getTable(), lfn.getColumnDesc().getName(), lfn.getColumnDesc().getZeroBasedIndex(), "string", signature);
 
             List<byte[]> values = new ArrayList<byte[]>();
             values.add(new byte[] { 97, 97, 97 });