You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/13 09:25:29 UTC
incubator-kylin git commit: KYLIN-889 Support more than one HDFS
files of lookup table
Repository: incubator-kylin
Updated Branches:
refs/heads/0.7-staging 4176211a7 -> 575e0b8d2
KYLIN-889 Support more than one HDFS files of lookup table
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/575e0b8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/575e0b8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/575e0b8d
Branch: refs/heads/0.7-staging
Commit: 575e0b8d2e3b1569b14cbce9f7fd8e2f12848864
Parents: 4176211
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Jul 13 15:24:41 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Jul 13 15:25:18 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfig.java | 4 +
.../apache/kylin/common/util/HiveClient.java | 10 ---
.../org/apache/kylin/dict/lookup/FileTable.java | 48 +++++++----
.../org/apache/kylin/dict/lookup/HiveTable.java | 91 ++++++--------------
.../kylin/dict/lookup/SnapshotManager.java | 10 ++-
5 files changed, 64 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/575e0b8d/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index aa2f224..bab09ac 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -425,6 +425,10 @@ public class KylinConfig {
return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
}
+ public int getTableSnapshotMaxMB() {
+ return Integer.parseInt(getOptional("kylin.table.snapshot.max_mb", "300"));
+ }
+
public int getScanThreshold() {
return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/575e0b8d/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HiveClient.java b/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
index 86a403d..69e16b3 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
@@ -158,18 +158,8 @@ public class HiveClient {
return result;
}
- /**
- *
- * @param database
- * @param tableName
- * @throws Exception
- */
-
public boolean isNativeTable(String database, String tableName) throws Exception{
-
return !MetaStoreUtils.isNonNativeTable(getMetaStoreClient().getTable(database, tableName));
-
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/575e0b8d/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
index b1b0e4d..1e9b1e8 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
@@ -19,12 +19,14 @@
package org.apache.kylin.dict.lookup;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-
import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
/**
* @author yangli9
@@ -38,24 +40,15 @@ public class FileTable implements ReadableTable {
String path;
String delim;
int nColumns;
- boolean nativeTable;
public FileTable(String path, int nColumns) {
- this(path, DELIM_AUTO, nColumns, true);
+ this(path, DELIM_AUTO, nColumns);
}
- public FileTable(String path, String delim, int nColumns, boolean nativeTable) {
+ public FileTable(String path, String delim, int nColumns) {
this.path = path;
this.delim = delim;
this.nColumns = nColumns;
- this.nativeTable = nativeTable;
- }
-
- public FileTable(String path, int nColumns, boolean nativeTable) {
- this.path = path;
- this.delim = DELIM_AUTO;
- this.nColumns = nColumns;
- this.nativeTable = nativeTable;
}
@Override
@@ -65,16 +58,35 @@ public class FileTable implements ReadableTable {
@Override
public TableSignature getSignature() throws IOException {
- FileSystem fs = HadoopUtil.getFileSystem(path);
- FileStatus status = fs.getFileStatus(new Path(path));
- if (nativeTable) {
- return new TableSignature(path, status.getLen(), status.getModificationTime());
- }
- return new TableSignature(path, status.getLen(), System.currentTimeMillis());
+ Pair<Long, Long> sizeAndLastModified = getSizeAndLastModified(path);
+ return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond());
}
@Override
public String toString() {
return path;
}
+
+ public static Pair<Long, Long> getSizeAndLastModified(String path) throws IOException {
+ FileSystem fs = HadoopUtil.getFileSystem(path);
+
+ // get all contained files if path is directory
+ ArrayList<FileStatus> allFiles = new ArrayList<>();
+ FileStatus status = fs.getFileStatus(new Path(path));
+ if (status.isFile()) {
+ allFiles.add(status);
+ } else {
+ FileStatus[] listStatus = fs.listStatus(new Path(path));
+ allFiles.addAll(Arrays.asList(listStatus));
+ }
+
+ long size = 0;
+ long lastModified = 0;
+ for (FileStatus file : allFiles) {
+ size += file.getLen();
+ lastModified = Math.max(lastModified, file.getModificationTime());
+ }
+
+ return new Pair<Long, Long>(size, lastModified);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/575e0b8d/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
index e926a89..c90c1c8 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
@@ -18,44 +18,31 @@
package org.apache.kylin.dict.lookup;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.HiveClient;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * @author yangli9
- *
*/
public class HiveTable implements ReadableTable {
private static final Logger logger = LoggerFactory.getLogger(HiveTable.class);
- private String database;
- private String hiveTable;
- private int nColumns;
- private String hdfsLocation;
- private FileTable fileTable;
+ final private String database;
+ final private String hiveTable;
+
private HiveClient hiveClient;
- private boolean nativeTable;
public HiveTable(MetadataManager metaMgr, String table) {
TableDesc tableDesc = metaMgr.getTableDesc(table);
this.database = tableDesc.getDatabase();
this.hiveTable = tableDesc.getName();
- this.nColumns = tableDesc.getColumnCount();
}
@Override
@@ -65,65 +52,36 @@ public class HiveTable implements ReadableTable {
@Override
public TableSignature getSignature() throws IOException {
- return getFileTable().getSignature();
- }
-
- private FileTable getFileTable() throws IOException {
try {
- if (fileTable == null) {
- nativeTable = getHiveClient().isNativeTable(database, hiveTable);
- fileTable = new FileTable(getHDFSLocation(), nColumns, nativeTable);
+ String path = computeHDFSLocation();
+ Pair<Long, Long> sizeAndLastModified = FileTable.getSizeAndLastModified(path);
+ long size = sizeAndLastModified.getFirst();
+ long lastModified = sizeAndLastModified.getSecond();
+
+ // for non-native hive table, cannot rely on size & last modified on HDFS
+ if (getHiveClient().isNativeTable(database, hiveTable) == false) {
+ lastModified = System.currentTimeMillis(); // assume table is ever changing
}
- } catch (Exception e) {
- e.printStackTrace();
- throw new IOException(e);
- }
- return fileTable;
- }
- public String getHDFSLocation() throws IOException {
- if (hdfsLocation == null) {
- hdfsLocation = computeHDFSLocation();
+ return new TableSignature(path, size, lastModified);
+
+ } catch (Exception e) {
+ if (e instanceof IOException)
+ throw (IOException) e;
+ else
+ throw new IOException(e);
}
- return hdfsLocation;
}
- private String computeHDFSLocation() throws IOException {
+ private String computeHDFSLocation() throws Exception {
String override = KylinConfig.getInstanceFromEnv().getOverrideHiveTableLocation(hiveTable);
if (override != null) {
logger.debug("Override hive table location " + hiveTable + " -- " + override);
return override;
}
-
- String hdfsDir = null;
- try {
- hdfsDir = getHiveClient().getHiveTableLocation(database, hiveTable);
- } catch (Exception e) {
- e.printStackTrace();
- throw new IOException(e);
- }
-
- if (nativeTable) {
- FileSystem fs = HadoopUtil.getFileSystem(hdfsDir);
- FileStatus file = findOnlyFile(hdfsDir, fs);
- return file.getPath().toString();
- } else {
- return hdfsDir;
- }
- }
-
- private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFoundException, IOException {
- FileStatus[] files = fs.listStatus(new Path(hdfsDir));
- ArrayList<FileStatus> nonZeroFiles = Lists.newArrayList();
- for (FileStatus f : files) {
- if (f.getLen() > 0)
- nonZeroFiles.add(f);
- }
- if (nonZeroFiles.size() != 1)
- throw new IllegalStateException("Expect 1 and only 1 non-zero file under " + hdfsDir + ", but find " + nonZeroFiles.size());
- return nonZeroFiles.get(0);
+ return getHiveClient().getHiveTableLocation(database, hiveTable);
}
@Override
@@ -131,8 +89,7 @@ public class HiveTable implements ReadableTable {
return "hive: database=[" + database + "], table=[" + hiveTable + "]";
}
- public HiveClient getHiveClient() {
-
+ public HiveClient getHiveClient() {
if (hiveClient == null) {
hiveClient = new HiveClient();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/575e0b8d/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 51d4094..af2d2a0 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -55,10 +55,7 @@ public class SnapshotManager {
// ============================================================================
private KylinConfig config;
- private ConcurrentHashMap<String, SnapshotTable> snapshotCache; // resource
-
- // path ==>
- // SnapshotTable
+ private ConcurrentHashMap<String, SnapshotTable> snapshotCache; // resource path ==> SnapshotTable
private SnapshotManager(KylinConfig config) {
this.config = config;
@@ -94,6 +91,11 @@ public class SnapshotManager {
return getSnapshotTable(dup);
}
+ if (snapshot.getSignature().getSize() / 1024 / 1024 > config.getTableSnapshotMaxMB()) {
+ throw new IllegalStateException("Table snapshot should be no greater than " + config.getTableSnapshotMaxMB() //
+ + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
+ }
+
snapshot.takeSnapshot(table, tableDesc);
return trySaveNewSnapshot(snapshot);