You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/13 09:25:29 UTC

incubator-kylin git commit: KYLIN-889 Support more than one HDFS files of lookup table

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.7-staging 4176211a7 -> 575e0b8d2


KYLIN-889 Support more than one HDFS files of lookup table


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/575e0b8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/575e0b8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/575e0b8d

Branch: refs/heads/0.7-staging
Commit: 575e0b8d2e3b1569b14cbce9f7fd8e2f12848864
Parents: 4176211
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Jul 13 15:24:41 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Jul 13 15:25:18 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  4 +
 .../apache/kylin/common/util/HiveClient.java    | 10 ---
 .../org/apache/kylin/dict/lookup/FileTable.java | 48 +++++++----
 .../org/apache/kylin/dict/lookup/HiveTable.java | 91 ++++++--------------
 .../kylin/dict/lookup/SnapshotManager.java      | 10 ++-
 5 files changed, 64 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/575e0b8d/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index aa2f224..bab09ac 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -425,6 +425,10 @@ public class KylinConfig {
         return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
     }
     
+    public int getTableSnapshotMaxMB() {
+        return Integer.parseInt(getOptional("kylin.table.snapshot.max_mb", "300"));
+    }
+    
     public int getScanThreshold() {
         return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/575e0b8d/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HiveClient.java b/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
index 86a403d..69e16b3 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HiveClient.java
@@ -158,18 +158,8 @@ public class HiveClient {
         return result;
     }
 
-    /**
-     *
-     * @param database
-     * @param tableName
-     * @throws Exception
-     */
-
     public boolean isNativeTable(String database, String tableName)  throws Exception{
-
         return !MetaStoreUtils.isNonNativeTable(getMetaStoreClient().getTable(database, tableName));
-
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/575e0b8d/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
index b1b0e4d..1e9b1e8 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
@@ -19,12 +19,14 @@
 package org.apache.kylin.dict.lookup;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
 
 /**
  * @author yangli9
@@ -38,24 +40,15 @@ public class FileTable implements ReadableTable {
     String path;
     String delim;
     int nColumns;
-    boolean nativeTable;
 
     public FileTable(String path, int nColumns) {
-        this(path, DELIM_AUTO, nColumns, true);
+        this(path, DELIM_AUTO, nColumns);
     }
 
-    public FileTable(String path, String delim, int nColumns, boolean nativeTable) {
+    public FileTable(String path, String delim, int nColumns) {
         this.path = path;
         this.delim = delim;
         this.nColumns = nColumns;
-        this.nativeTable = nativeTable;
-    }
-
-    public FileTable(String path, int nColumns, boolean nativeTable) {
-        this.path = path;
-        this.delim = DELIM_AUTO;
-        this.nColumns = nColumns;
-        this.nativeTable = nativeTable;
     }
 
     @Override
@@ -65,16 +58,35 @@ public class FileTable implements ReadableTable {
 
     @Override
     public TableSignature getSignature() throws IOException {
-        FileSystem fs = HadoopUtil.getFileSystem(path);
-        FileStatus status = fs.getFileStatus(new Path(path));
-        if (nativeTable) {
-            return new TableSignature(path, status.getLen(), status.getModificationTime());
-        }
-        return new TableSignature(path, status.getLen(), System.currentTimeMillis());
+        Pair<Long, Long> sizeAndLastModified = getSizeAndLastModified(path);
+        return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond());
     }
 
     @Override
     public String toString() {
         return path;
     }
+    
+    public static Pair<Long, Long> getSizeAndLastModified(String path) throws IOException {
+        FileSystem fs = HadoopUtil.getFileSystem(path);
+        
+        // get all contained files if path is directory
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(path));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(path));
+            allFiles.addAll(Arrays.asList(listStatus));
+        }
+        
+        long size = 0;
+        long lastModified = 0;
+        for (FileStatus file : allFiles) {
+            size += file.getLen();
+            lastModified = Math.max(lastModified, file.getModificationTime());
+        }
+        
+        return new Pair<Long, Long>(size, lastModified);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/575e0b8d/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
index e926a89..c90c1c8 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
@@ -18,44 +18,31 @@
 
 package org.apache.kylin.dict.lookup;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveClient;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * @author yangli9
- * 
  */
 public class HiveTable implements ReadableTable {
 
     private static final Logger logger = LoggerFactory.getLogger(HiveTable.class);
 
-    private String database;
-    private String hiveTable;
-    private int nColumns;
-    private String hdfsLocation;
-    private FileTable fileTable;
+    final private String database;
+    final private String hiveTable;
+
     private HiveClient hiveClient;
-    private boolean nativeTable;
 
     public HiveTable(MetadataManager metaMgr, String table) {
         TableDesc tableDesc = metaMgr.getTableDesc(table);
         this.database = tableDesc.getDatabase();
         this.hiveTable = tableDesc.getName();
-        this.nColumns = tableDesc.getColumnCount();
     }
 
     @Override
@@ -65,65 +52,36 @@ public class HiveTable implements ReadableTable {
 
     @Override
     public TableSignature getSignature() throws IOException {
-        return getFileTable().getSignature();
-    }
-
-    private FileTable getFileTable() throws IOException {
         try {
-            if (fileTable == null) {
-                nativeTable = getHiveClient().isNativeTable(database, hiveTable);
-                fileTable = new FileTable(getHDFSLocation(), nColumns, nativeTable);
+            String path = computeHDFSLocation();
+            Pair<Long, Long> sizeAndLastModified = FileTable.getSizeAndLastModified(path);
+            long size = sizeAndLastModified.getFirst();
+            long lastModified = sizeAndLastModified.getSecond();
+            
+            // for non-native hive table, cannot rely on size & last modified on HDFS
+            if (getHiveClient().isNativeTable(database, hiveTable) == false) {
+                lastModified = System.currentTimeMillis(); // assume table is ever changing
             }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new IOException(e);
-        }
-        return fileTable;
-    }
 
-    public String getHDFSLocation() throws IOException {
-        if (hdfsLocation == null) {
-            hdfsLocation = computeHDFSLocation();
+            return new TableSignature(path, size, lastModified);
+            
+        } catch (Exception e) {
+            if (e instanceof IOException)
+                throw (IOException) e;
+            else
+                throw new IOException(e);
         }
-        return hdfsLocation;
     }
 
-    private String computeHDFSLocation() throws IOException {
+    private String computeHDFSLocation() throws Exception {
 
         String override = KylinConfig.getInstanceFromEnv().getOverrideHiveTableLocation(hiveTable);
         if (override != null) {
             logger.debug("Override hive table location " + hiveTable + " -- " + override);
             return override;
         }
-        
-        String hdfsDir = null;
-        try {
-            hdfsDir = getHiveClient().getHiveTableLocation(database, hiveTable);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new IOException(e);
-        }
-
-        if (nativeTable) {
-            FileSystem fs = HadoopUtil.getFileSystem(hdfsDir);
-            FileStatus file = findOnlyFile(hdfsDir, fs);
-            return file.getPath().toString();
-        } else {
-            return hdfsDir;
-        }
 
-    }
-
-    private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFoundException, IOException {
-        FileStatus[] files = fs.listStatus(new Path(hdfsDir));
-        ArrayList<FileStatus> nonZeroFiles = Lists.newArrayList();
-        for (FileStatus f : files) {
-            if (f.getLen() > 0)
-                nonZeroFiles.add(f);
-        }
-        if (nonZeroFiles.size() != 1)
-            throw new IllegalStateException("Expect 1 and only 1 non-zero file under " + hdfsDir + ", but find " + nonZeroFiles.size());
-        return nonZeroFiles.get(0);
+        return getHiveClient().getHiveTableLocation(database, hiveTable);
     }
 
     @Override
@@ -131,8 +89,7 @@ public class HiveTable implements ReadableTable {
         return "hive: database=[" + database + "], table=[" + hiveTable + "]";
     }
 
-    public HiveClient getHiveClient()  {
-
+    public HiveClient getHiveClient() {
         if (hiveClient == null) {
             hiveClient = new HiveClient();
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/575e0b8d/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 51d4094..af2d2a0 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -55,10 +55,7 @@ public class SnapshotManager {
     // ============================================================================
 
     private KylinConfig config;
-    private ConcurrentHashMap<String, SnapshotTable> snapshotCache; // resource
-
-    // path ==>
-    // SnapshotTable
+    private ConcurrentHashMap<String, SnapshotTable> snapshotCache; // resource path ==> SnapshotTable
 
     private SnapshotManager(KylinConfig config) {
         this.config = config;
@@ -94,6 +91,11 @@ public class SnapshotManager {
             return getSnapshotTable(dup);
         }
 
+        if (snapshot.getSignature().getSize() / 1024 / 1024 > config.getTableSnapshotMaxMB()) {
+            throw new IllegalStateException("Table snapshot should be no greater than " + config.getTableSnapshotMaxMB() //
+                    + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
+        }
+
         snapshot.takeSnapshot(table, tableDesc);
 
         return trySaveNewSnapshot(snapshot);