You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/16 05:27:44 UTC

[04/11] incubator-kylin git commit: KYLIN-889 Support more than one HDFS files of lookup table

KYLIN-889 Support more than one HDFS files of lookup table


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0c9280d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0c9280d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0c9280d5

Branch: refs/heads/KYLIN-876
Commit: 0c9280d544ada991cdd19de209a252ceda344c39
Parents: 2a29f15
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Jul 13 15:37:00 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Jul 13 15:37:00 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  4 +
 .../apache/kylin/common/util/HiveClient.java    |  4 +
 .../org/apache/kylin/dict/lookup/FileTable.java | 31 ++++++-
 .../org/apache/kylin/dict/lookup/HiveTable.java | 87 +++++++-------------
 .../kylin/dict/lookup/SnapshotManager.java      |  5 ++
 5 files changed, 71 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0c9280d5/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index fd0a7d9..ef6b7d1 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -434,6 +434,10 @@ public class KylinConfig {
         return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
     }
     
+    public int getTableSnapshotMaxMB() {
+        return Integer.parseInt(getOptional("kylin.table.snapshot.max_mb", "300"));
+    }
+    
     public int getHBaseRegionCut(String capacity) {
         String cut;
         switch (capacity) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0c9280d5/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HiveClient.java b/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
index cfeea31..a5be14e 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
@@ -155,4 +156,7 @@ public class HiveClient {
         return result;
     }
 
+    public boolean isNativeTable(String database, String tableName)  throws Exception{
+        return !MetaStoreUtils.isNonNativeTable(getMetaStoreClient().getTable(database, tableName));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0c9280d5/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
index 139761e..383513f 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
@@ -19,11 +19,14 @@
 package org.apache.kylin.dict.lookup;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
 
 /**
  */
@@ -57,13 +60,35 @@ public class FileTable implements ReadableTable {
 
     @Override
     public TableSignature getSignature() throws IOException {
-        FileSystem fs = HadoopUtil.getFileSystem(path);
-        FileStatus status = fs.getFileStatus(new Path(path));
-        return new TableSignature(path, status.getLen(), status.getModificationTime());
+        Pair<Long, Long> sizeAndLastModified = getSizeAndLastModified(path);
+        return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond());
     }
 
     @Override
     public String toString() {
         return path;
     }
+    
+    public static Pair<Long, Long> getSizeAndLastModified(String path) throws IOException {
+        FileSystem fs = HadoopUtil.getFileSystem(path);
+        
+        // get all contained files if path is directory
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(path));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(path));
+            allFiles.addAll(Arrays.asList(listStatus));
+        }
+        
+        long size = 0;
+        long lastModified = 0;
+        for (FileStatus file : allFiles) {
+            size += file.getLen();
+            lastModified = Math.max(lastModified, file.getModificationTime());
+        }
+        
+        return new Pair<Long, Long>(size, lastModified);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0c9280d5/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
index cbd3b04..68e9b82 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
@@ -18,42 +18,31 @@
 
 package org.apache.kylin.dict.lookup;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveClient;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * @author yangli9
- * 
  */
 public class HiveTable implements ReadableTable {
 
     private static final Logger logger = LoggerFactory.getLogger(HiveTable.class);
 
-    private String database;
-    private String hiveTable;
-    private int nColumns;
-    private String hdfsLocation;
-    private FileTable fileTable;
+    final private String database;
+    final private String hiveTable;
+    
+    private HiveClient hiveClient;
 
     public HiveTable(MetadataManager metaMgr, String table) {
         TableDesc tableDesc = metaMgr.getTableDesc(table);
         this.database = tableDesc.getDatabase();
         this.hiveTable = tableDesc.getName();
-        this.nColumns = tableDesc.getColumnCount();
     }
 
     @Override
@@ -63,60 +52,44 @@ public class HiveTable implements ReadableTable {
 
     @Override
     public TableSignature getSignature() throws IOException {
-        return getFileTable().getSignature();
-    }
+        try {
+            String path = computeHDFSLocation();
+            Pair<Long, Long> sizeAndLastModified = FileTable.getSizeAndLastModified(path);
+            long size = sizeAndLastModified.getFirst();
+            long lastModified = sizeAndLastModified.getSecond();
 
-    private FileTable getFileTable() throws IOException {
-        if (fileTable == null) {
-            fileTable = new FileTable(getHDFSLocation(true), nColumns);
-        }
-        return fileTable;
-    }
+            // for non-native hive table, cannot rely on size & last modified on HDFS
+            if (getHiveClient().isNativeTable(database, hiveTable) == false) {
+                lastModified = System.currentTimeMillis(); // assume table is ever changing
+            }
+
+            return new TableSignature(path, size, lastModified);
 
-    public String getHDFSLocation(boolean needFilePath) throws IOException {
-        if (hdfsLocation == null) {
-            hdfsLocation = computeHDFSLocation(needFilePath);
+        } catch (Exception e) {
+            if (e instanceof IOException)
+                throw (IOException) e;
+            else
+                throw new IOException(e);
         }
-        return hdfsLocation;
     }
 
-    private String computeHDFSLocation(boolean needFilePath) throws IOException {
+    private String computeHDFSLocation() throws Exception {
 
         String override = KylinConfig.getInstanceFromEnv().getOverrideHiveTableLocation(hiveTable);
         if (override != null) {
             logger.debug("Override hive table location " + hiveTable + " -- " + override);
             return override;
         }
-        
-        String hdfsDir = null;
-        try {
-            HiveClient hiveClient = new HiveClient();
-            hdfsDir = hiveClient.getHiveTableLocation(database, hiveTable);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new IOException(e);
-        }
-
-        if (needFilePath) {
-            FileSystem fs = HadoopUtil.getFileSystem(hdfsDir);
-            FileStatus file = findOnlyFile(hdfsDir, fs);
-            return file.getPath().toString();
-        } else {
-            return hdfsDir;
-        }
 
+        return getHiveClient().getHiveTableLocation(database, hiveTable);
     }
 
-    private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFoundException, IOException {
-        FileStatus[] files = fs.listStatus(new Path(hdfsDir));
-        ArrayList<FileStatus> nonZeroFiles = Lists.newArrayList();
-        for (FileStatus f : files) {
-            if (f.getLen() > 0)
-                nonZeroFiles.add(f);
+    public HiveClient getHiveClient() {
+
+        if (hiveClient == null) {
+            hiveClient = new HiveClient();
         }
-        if (nonZeroFiles.size() != 1)
-            throw new IllegalStateException("Expect 1 and only 1 non-zero file under " + hdfsDir + ", but find " + nonZeroFiles.size());
-        return nonZeroFiles.get(0);
+        return hiveClient;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0c9280d5/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 3b6db77..e9d74bb 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -94,6 +94,11 @@ public class SnapshotManager {
             return getSnapshotTable(dup);
         }
 
+        if (snapshot.getSignature().getSize() / 1024 / 1024 > config.getTableSnapshotMaxMB()) {
+            throw new IllegalStateException("Table snapshot should be no greater than " + config.getTableSnapshotMaxMB() //
+                    + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
+        }
+
         snapshot.takeSnapshot(table, tableDesc);
 
         return trySaveNewSnapshot(snapshot);