You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/13 07:35:39 UTC
[2/2] incubator-kylin git commit: KYLIN-877 Simply interface
ReadableTable, get ready for hive abstraction
KYLIN-877 Simply interface ReadableTable, get ready for hive abstraction
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/988f1369
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/988f1369
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/988f1369
Branch: refs/heads/0.8
Commit: 988f136917d0d489769a2c62126f4ac1e53c8df8
Parents: bbd5e27
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Jul 13 13:33:48 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Jul 13 13:33:48 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/dict/DictionaryGenerator.java | 18 +--
.../org/apache/kylin/dict/DictionaryInfo.java | 19 +---
.../apache/kylin/dict/DictionaryManager.java | 26 +++--
.../org/apache/kylin/dict/lookup/FileTable.java | 15 +--
.../kylin/dict/lookup/FileTableReader.java | 13 +--
.../org/apache/kylin/dict/lookup/HiveTable.java | 10 --
.../kylin/dict/lookup/HiveTableReader.java | 8 +-
.../apache/kylin/dict/lookup/LookupTable.java | 2 +
.../apache/kylin/dict/lookup/ReadableTable.java | 106 +++++++++++++++++-
.../kylin/dict/lookup/SnapshotManager.java | 2 +-
.../apache/kylin/dict/lookup/SnapshotTable.java | 19 ----
.../apache/kylin/dict/lookup/TableReader.java | 36 ------
.../kylin/dict/lookup/TableSignature.java | 111 -------------------
.../kylin/dict/ITSnapshotManagerTest.java | 7 +-
.../org/apache/kylin/dict/TableReaderTest.java | 5 +-
.../job/hadoop/cube/FactDistinctColumnsJob.java | 11 +-
.../kylin/job/streaming/CubeStreamConsumer.java | 43 ++++---
.../job/hadoop/cube/MergeCuboidMapperTest.java | 28 +++--
18 files changed, 188 insertions(+), 291 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 3d79ffa..60b2a8d 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -18,23 +18,27 @@
package org.apache.kylin.dict;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.TableReader;
+import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
import org.apache.kylin.metadata.model.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
/**
* @author yangli9
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index b6109c7..802f6e7 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -21,9 +21,10 @@ package org.apache.kylin.dict;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class DictionaryInfo extends RootPersistentEntity {
@@ -38,8 +39,6 @@ public class DictionaryInfo extends RootPersistentEntity {
private String dataType;
@JsonProperty("input")
private TableSignature input;
- @JsonProperty("input_delimeter")
- private String inputDelimeter;
@JsonProperty("dictionary_class")
private String dictionaryClass;
@JsonProperty("cardinality")
@@ -50,7 +49,7 @@ public class DictionaryInfo extends RootPersistentEntity {
public DictionaryInfo() {
}
- public DictionaryInfo(String sourceTable, String sourceColumn, int sourceColumnIndex, String dataType, TableSignature input, String inputDelimeter) {
+ public DictionaryInfo(String sourceTable, String sourceColumn, int sourceColumnIndex, String dataType, TableSignature input) {
this.updateRandomUuid();
@@ -59,7 +58,6 @@ public class DictionaryInfo extends RootPersistentEntity {
this.sourceColumnIndex = sourceColumnIndex;
this.dataType = dataType;
this.input = input;
- this.inputDelimeter = inputDelimeter;
}
public DictionaryInfo(DictionaryInfo other) {
@@ -71,7 +69,6 @@ public class DictionaryInfo extends RootPersistentEntity {
this.sourceColumnIndex = other.sourceColumnIndex;
this.dataType = other.dataType;
this.input = other.input;
- this.inputDelimeter = other.inputDelimeter;
}
// ----------------------------------------------------------------------------
@@ -89,7 +86,7 @@ public class DictionaryInfo extends RootPersistentEntity {
// to decide if two dictionaries are built on the same table/column,
// regardless of their signature
public boolean isDictOnSameColumn(DictionaryInfo other) {
- return this.sourceTable.equalsIgnoreCase(other.sourceTable) && this.sourceColumn.equalsIgnoreCase(other.sourceColumn) && this.sourceColumnIndex == other.sourceColumnIndex && this.dataType.equalsIgnoreCase(other.dataType) && this.inputDelimeter.equalsIgnoreCase(other.inputDelimeter) && this.dictionaryClass.equalsIgnoreCase(other.dictionaryClass);
+ return this.sourceTable.equalsIgnoreCase(other.sourceTable) && this.sourceColumn.equalsIgnoreCase(other.sourceColumn) && this.sourceColumnIndex == other.sourceColumnIndex && this.dataType.equalsIgnoreCase(other.dataType) && this.dictionaryClass.equalsIgnoreCase(other.dictionaryClass);
}
public String getSourceTable() {
@@ -132,14 +129,6 @@ public class DictionaryInfo extends RootPersistentEntity {
this.input = input;
}
- public String getInputDelimeter() {
- return inputDelimeter;
- }
-
- public void setInputDelimeter(String inputDelimeter) {
- this.inputDelimeter = inputDelimeter;
- }
-
public String getDictionaryClass() {
return dictionaryClass;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 0ba1ed9..2e1e3db 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -18,7 +18,15 @@
package org.apache.kylin.dict;
-import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -29,7 +37,7 @@ import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.dict.lookup.FileTable;
import org.apache.kylin.dict.lookup.HiveTable;
import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.DataType;
@@ -37,14 +45,7 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Lists;
public class DictionaryManager {
@@ -204,10 +205,11 @@ public class DictionaryManager {
int srcColIdx = (Integer) tmp[2];
ReadableTable inpTable = (ReadableTable) tmp[3];
- if (!inpTable.exists())
+ TableSignature inputSig = inpTable.getSignature();
+ if (inputSig == null) // table does not exists
return null;
- DictionaryInfo dictInfo = new DictionaryInfo(srcTable, srcCol, srcColIdx, col.getDatatype(), inpTable.getSignature(), inpTable.getColumnDelimeter());
+ DictionaryInfo dictInfo = new DictionaryInfo(srcTable, srcCol, srcColIdx, col.getDatatype(), inputSig);
String dupDict = checkDupByInfo(dictInfo);
if (dupDict != null) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
index 0677860..139761e 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
@@ -23,15 +23,15 @@ import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-
import org.apache.kylin.common.util.HadoopUtil;
/**
- * @author yangli9
- *
*/
public class FileTable implements ReadableTable {
+ public static final String DELIM_AUTO = "auto";
+ public static final String DELIM_COMMA = ",";
+
String path;
String delim;
int nColumns;
@@ -46,26 +46,17 @@ public class FileTable implements ReadableTable {
this.nColumns = nColumns;
}
- @Override
public String getColumnDelimeter() {
return delim;
}
@Override
- public boolean exists() throws IOException {
- FileSystem fs = HadoopUtil.getFileSystem(path);
- return fs.exists(new Path(path));
- }
-
- @Override
public TableReader getReader() throws IOException {
return new FileTableReader(path, delim, nColumns);
}
@Override
public TableSignature getSignature() throws IOException {
- if (!exists())
- throw new IllegalStateException("Table not exists");
FileSystem fs = HadoopUtil.getFileSystem(path);
FileStatus status = fs.getFileStatus(new Path(path));
return new TableSignature(path, status.getLen(), status.getModificationTime());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
index 3631b25..bf46963 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
@@ -37,9 +37,9 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
/**
* Tables are typically CSV or SEQ file.
@@ -61,7 +61,7 @@ public class FileTableReader implements TableReader {
private int expectedColumnNumber = -1; // helps delimiter detection
public FileTableReader(String filePath, int expectedColumnNumber) throws IOException {
- this(filePath, ReadableTable.DELIM_AUTO, expectedColumnNumber);
+ this(filePath, FileTable.DELIM_AUTO, expectedColumnNumber);
}
public FileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException {
@@ -94,11 +94,6 @@ public class FileTableReader implements TableReader {
}
@Override
- public void setExpectedColumnNumber(int expectedColumnNumber) {
- this.expectedColumnNumber = expectedColumnNumber;
- }
-
- @Override
public boolean next() throws IOException {
curLine = reader.nextLine();
curColumns = null;
@@ -112,7 +107,7 @@ public class FileTableReader implements TableReader {
@Override
public String[] getRow() {
if (curColumns == null) {
- if (ReadableTable.DELIM_AUTO.equals(delim))
+ if (FileTable.DELIM_AUTO.equals(delim))
delim = autoDetectDelim(curLine);
if (delim == null)
@@ -128,7 +123,7 @@ public class FileTableReader implements TableReader {
String str[] = StringSplitter.split(line, delim);
// un-escape CSV
- if (ReadableTable.DELIM_COMMA.equals(delim)) {
+ if (FileTable.DELIM_COMMA.equals(delim)) {
for (int i = 0; i < str.length; i++) {
str[i] = unescapeCsv(str[i]);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
index c634647..cbd3b04 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
@@ -57,16 +57,6 @@ public class HiveTable implements ReadableTable {
}
@Override
- public String getColumnDelimeter() throws IOException {
- return getFileTable().getColumnDelimeter();
- }
-
- @Override
- public boolean exists() throws IOException {
- return true;
- }
-
- @Override
public TableReader getReader() throws IOException {
return new HiveTableReader(database, hiveTable);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
index c2af155..96aa0d1 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
@@ -33,11 +33,10 @@ import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
import org.apache.hive.hcatalog.data.transfer.HCatReader;
import org.apache.hive.hcatalog.data.transfer.ReadEntity;
import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
/**
* An implementation of TableReader with HCatalog for Hive table.
- * @author shaoshi
- *
*/
public class HiveTableReader implements TableReader {
@@ -135,11 +134,6 @@ public class HiveTableReader implements TableReader {
}
@Override
- public void setExpectedColumnNumber(int expectedColumnNumber) {
-
- }
-
- @Override
public void close() throws IOException {
this.readCntxt = null;
this.currentHCatRecordItr = null;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index 50d4cbc..7e83197 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -28,7 +28,9 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.util.Pair;
import com.google.common.collect.Sets;
+
import org.apache.kylin.common.util.Array;
+import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
import org.apache.kylin.metadata.model.TableDesc;
/**
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
index 3fe6773..2e6af14 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
@@ -18,22 +18,116 @@
package org.apache.kylin.dict.lookup;
+import java.io.Closeable;
import java.io.IOException;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+
/**
- * @author yangli9
*/
public interface ReadableTable {
- public static final String DELIM_AUTO = "auto";
- public static final String DELIM_COMMA = ",";
-
+ /** Returns a reader to read the table. */
public TableReader getReader() throws IOException;
+ /** Used to detect table modifications mainly. Return null in case table does not exist. */
public TableSignature getSignature() throws IOException;
- public String getColumnDelimeter() throws IOException;
+ public interface TableReader extends Closeable {
+
+ /** Move to the next row, return false if no more record. */
+ public boolean next() throws IOException;
+
+ /** Get the current row. */
+ public String[] getRow();
+
+ }
+
+ // ============================================================================
+
+ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+ public class TableSignature {
+
+ @JsonProperty("path")
+ private String path;
+ @JsonProperty("size")
+ private long size;
+ @JsonProperty("last_modified_time")
+ private long lastModifiedTime;
+
+ // for JSON serialization
+ public TableSignature() {
+ }
+
+ public TableSignature(String path, long size, long lastModifiedTime) {
+ super();
+ this.path = path;
+ this.size = size;
+ this.lastModifiedTime = lastModifiedTime;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+ public void setLastModifiedTime(long lastModifiedTime) {
+ this.lastModifiedTime = lastModifiedTime;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public long getLastModifiedTime() {
+ return lastModifiedTime;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32));
+ result = prime * result + ((path == null) ? 0 : path.hashCode());
+ result = prime * result + (int) (size ^ (size >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TableSignature other = (TableSignature) obj;
+ if (lastModifiedTime != other.lastModifiedTime)
+ return false;
+ if (path == null) {
+ if (other.path != null)
+ return false;
+ } else if (!path.equals(other.path))
+ return false;
+ if (size != other.size)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]";
+ }
- public boolean exists() throws IOException;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index a2f6d5d..3b6db77 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -24,9 +24,9 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index 90cb5dd..aa46212 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -47,8 +47,6 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
@JsonProperty("signature")
private TableSignature signature;
- @JsonProperty("column_delimeter")
- private String columnDelimeter;
@JsonProperty("useDictionary")
private boolean useDictionary;
@@ -61,13 +59,11 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
SnapshotTable(ReadableTable table) throws IOException {
this.signature = table.getSignature();
- this.columnDelimeter = table.getColumnDelimeter();
this.useDictionary = true;
}
public void takeSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException {
this.signature = table.getSignature();
- this.columnDelimeter = table.getColumnDelimeter();
int maxIndex = tableDesc.getMaxColumnIndex();
@@ -134,11 +130,6 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
@Override
public void close() throws IOException {
}
-
- @Override
- public void setExpectedColumnNumber(int expectedColumnNumber) {
- // noop
- }
};
}
@@ -147,16 +138,6 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable
return signature;
}
- @Override
- public String getColumnDelimeter() throws IOException {
- return columnDelimeter;
- }
-
- @Override
- public boolean exists() throws IOException {
- return true;
- }
-
/**
* a naive implementation
*
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableReader.java
deleted file mode 100644
index e31ff96..0000000
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableReader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Tables are typically CSV or SEQ file.
- *
- * @author yangli9
- */
-public interface TableReader extends Closeable {
-
- public boolean next() throws IOException;
-
- public String[] getRow();
-
- public void setExpectedColumnNumber(int expectedColumnNumber);
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableSignature.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableSignature.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableSignature.java
deleted file mode 100644
index b9ecc71..0000000
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/TableSignature.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * @author yangli9
- */
-@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class TableSignature {
-
- @JsonProperty("path")
- private String path;
- @JsonProperty("size")
- private long size;
- @JsonProperty("last_modified_time")
- private long lastModifiedTime;
-
- // for JSON serialization
- public TableSignature() {
- }
-
- public TableSignature(String path, long size, long lastModifiedTime) {
- super();
- this.path = path;
- this.size = size;
- this.lastModifiedTime = lastModifiedTime;
- }
-
- public void setPath(String path) {
- this.path = path;
- }
-
- public void setSize(long size) {
- this.size = size;
- }
-
- public void setLastModifiedTime(long lastModifiedTime) {
- this.lastModifiedTime = lastModifiedTime;
- }
-
- public String getPath() {
- return path;
- }
-
- public long getSize() {
- return size;
- }
-
- public long getLastModifiedTime() {
- return lastModifiedTime;
- }
-
- // ============================================================================
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32));
- result = prime * result + ((path == null) ? 0 : path.hashCode());
- result = prime * result + (int) (size ^ (size >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TableSignature other = (TableSignature) obj;
- if (lastModifiedTime != other.lastModifiedTime)
- return false;
- if (path == null) {
- if (other.path != null)
- return false;
- } else if (!path.equals(other.path))
- return false;
- if (size != other.size)
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java b/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
index e59fcda..7fc37a6 100644
--- a/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
+++ b/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
@@ -18,20 +18,19 @@
package org.apache.kylin.dict;
+import static org.junit.Assert.*;
+
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.dict.lookup.HiveTable;
+import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
import org.apache.kylin.dict.lookup.SnapshotManager;
import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.dict.lookup.TableReader;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
/**
* @author yangli9
*
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java b/dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
index 2650aad..cfecaee 100644
--- a/dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
+++ b/dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
@@ -24,11 +24,10 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import org.apache.kylin.dict.lookup.FileTable;
import org.apache.kylin.dict.lookup.FileTableReader;
import org.junit.Test;
-import org.apache.kylin.dict.lookup.ReadableTable;
-
/**
* @author yangli9
*
@@ -38,7 +37,7 @@ public class TableReaderTest {
@Test
public void testBasicReader() throws IOException {
File f = new File("src/test/resources/dict/DW_SITES");
- FileTableReader reader = new FileTableReader("file://" + f.getAbsolutePath(), ReadableTable.DELIM_AUTO, 10);
+ FileTableReader reader = new FileTableReader("file://" + f.getAbsolutePath(), FileTable.DELIM_AUTO, 10);
while (reader.next()) {
assertEquals("[-1, Korea Auction.co.kr, S, 48, 0, 111, 2009-02-11, , DW_OFFPLAT, ]", Arrays.toString(reader.getRow()));
break;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index da587db..5ec963e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -18,11 +18,10 @@
package org.apache.kylin.job.hadoop.cube;
+import java.io.IOException;
+
import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -32,19 +31,14 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
/**
* @author yangli9
*/
@@ -80,7 +74,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
// add metadata to distributed cache
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
- CubeSegment newSegment = cubeInstance.getSegment(segmentName, SegmentStatusEnum.NEW);
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index e595089..980b375 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -1,13 +1,20 @@
package org.apache.kylin.job.streaming;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nullable;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,8 +43,7 @@ import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
@@ -53,13 +59,14 @@ import org.apache.kylin.streaming.MicroStreamBatchConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
/**
*/
@@ -130,7 +137,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
signature.setLastModifiedTime(System.currentTimeMillis());
signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
signature.setSize(endOffset - startOffset);
- DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(), tblColRef.getName(), tblColRef.getColumnDesc().getZeroBasedIndex(), tblColRef.getDatatype(), signature, ReadableTable.DELIM_AUTO);
+ DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(), tblColRef.getName(), tblColRef.getColumnDesc().getZeroBasedIndex(), tblColRef.getDatatype(), signature);
logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
DictionaryManager dictionaryManager = DictionaryManager.getInstance(kylinConfig);
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988f1369/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
index 01e57d5..5886324 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
@@ -18,6 +18,14 @@
package org.apache.kylin.job.hadoop.cube;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
@@ -26,8 +34,12 @@ import org.apache.kylin.cube.CubeBuilder;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.dict.*;
-import org.apache.kylin.dict.lookup.TableSignature;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.TrieDictionary;
+import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.TblColRef;
@@ -38,14 +50,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
/**
* @author honma
*/
@@ -69,7 +73,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
signature.setLastModifiedTime(System.currentTimeMillis());
signature.setPath("fake_common_dict");
- DictionaryInfo newDictInfo = new DictionaryInfo("", "", 0, "string", signature, "");
+ DictionaryInfo newDictInfo = new DictionaryInfo("", "", 0, "string", signature);
List<byte[]> values = new ArrayList<byte[]>();
values.add(new byte[] { 101, 101, 101 });
@@ -119,7 +123,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
signature.setLastModifiedTime(System.currentTimeMillis());
signature.setPath("fake_dict_for" + lfn.getName() + segment.getName());
- DictionaryInfo newDictInfo = new DictionaryInfo(lfn.getTable(), lfn.getColumnDesc().getName(), lfn.getColumnDesc().getZeroBasedIndex(), "string", signature, "");
+ DictionaryInfo newDictInfo = new DictionaryInfo(lfn.getTable(), lfn.getColumnDesc().getName(), lfn.getColumnDesc().getZeroBasedIndex(), "string", signature);
List<byte[]> values = new ArrayList<byte[]>();
values.add(new byte[] { 97, 97, 97 });