You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/16 05:27:48 UTC

[08/11] incubator-kylin git commit: KYLIN-877 Abstract Input Source interface and make hive a default impl

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
new file mode 100644
index 0000000..975d69f
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.*;
+
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
+import org.apache.kylin.source.TableSourceFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
+
+    SnapshotManager snapshotMgr;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        snapshotMgr = SnapshotManager.getInstance(getTestConfig());
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void basicTest() throws Exception {
+        String tableName = "EDW.TEST_SITES";
+        TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
+        ReadableTable hiveTable = TableSourceFactory.createReadableTable(tableDesc);
+        String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
+
+        snapshotMgr.wipeoutCache();
+
+        SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
+
+        // compare hive & snapshot
+        TableReader hiveReader = hiveTable.getReader();
+        TableReader snapshotReader = snapshot.getReader();
+
+        while (true) {
+            boolean hiveNext = hiveReader.next();
+            boolean snapshotNext = snapshotReader.next();
+            assertEquals(hiveNext, snapshotNext);
+
+            if (hiveNext == false)
+                break;
+
+            String[] hiveRow = hiveReader.getRow();
+            String[] snapshotRow = snapshotReader.getRow();
+            assertArrayEquals(hiveRow, snapshotRow);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index e1ab0af..0124b9b 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -24,7 +24,7 @@ import java.util.List;
  */
 public interface IJoinedFlatTableDesc {
 
-    public String getTableName(String jobUUID);
+    public String getTableName();
 
     public List<IntermediateColumnDesc> getColumnList();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java b/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
deleted file mode 100644
index fe5c2b3..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.util;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.HiveClient;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * Management class to sync hive table metadata with command See main method for
- * how to use the class
- *
- * @author jianliu
- */
-public class HiveSourceTableLoader {
-
-    @SuppressWarnings("unused")
-    private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
-
-    public static final String OUTPUT_SURFIX = "json";
-    public static final String TABLE_FOLDER_NAME = "table";
-    public static final String TABLE_EXD_FOLDER_NAME = "table_exd";
-
-    public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
-
-        Map<String, Set<String>> db2tables = Maps.newHashMap();
-        for (String table : hiveTables) {
-            String[] parts = HadoopUtil.parseHiveTableName(table);
-            Set<String> set = db2tables.get(parts[0]);
-            if (set == null) {
-                set = Sets.newHashSet();
-                db2tables.put(parts[0], set);
-            }
-            set.add(parts[1]);
-        }
-
-        // extract from hive
-        Set<String> loadedTables = Sets.newHashSet();
-        for (String database : db2tables.keySet()) {
-            List<String> loaded = extractHiveTables(database, db2tables.get(database), config);
-            loadedTables.addAll(loaded);
-        }
-
-        return loadedTables;
-    }
-
-    private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException {
-
-        List<String> loadedTables = Lists.newArrayList();
-        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-        for (String tableName : tables) {
-            Table table = null;
-            HiveClient hiveClient = new HiveClient();
-            List<FieldSchema> partitionFields = null;
-            List<FieldSchema> fields = null;
-            try {
-                table = hiveClient.getHiveTable(database, tableName);
-                partitionFields = table.getPartitionKeys();
-                fields = hiveClient.getHiveTableFields(database, tableName);
-            } catch (Exception e) {
-                e.printStackTrace();
-                throw new IOException(e);
-            }
-
-            if (fields != null && partitionFields != null && partitionFields.size() > 0) {
-                fields.addAll(partitionFields);
-            }
-
-            long tableSize = hiveClient.getFileSizeForTable(table);
-            long tableFileNum = hiveClient.getFileNumberForTable(table);
-            TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
-            if (tableDesc == null) {
-                tableDesc = new TableDesc();
-                tableDesc.setDatabase(database.toUpperCase());
-                tableDesc.setName(tableName.toUpperCase());
-                tableDesc.setUuid(UUID.randomUUID().toString());
-                tableDesc.setLastModified(0);
-            }
-
-            int columnNumber = fields.size();
-            List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
-            for (int i = 0; i < columnNumber; i++) {
-                FieldSchema field = fields.get(i);
-                ColumnDesc cdesc = new ColumnDesc();
-                cdesc.setName(field.getName().toUpperCase());
-                cdesc.setDatatype(field.getType());
-                cdesc.setId(String.valueOf(i + 1));
-                columns.add(cdesc);
-            }
-            tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
-
-            StringBuffer partitionColumnString = new StringBuffer();
-            for (int i = 0, n = partitionFields.size(); i < n; i++) {
-                if (i > 0)
-                    partitionColumnString.append(", ");
-                partitionColumnString.append(partitionFields.get(i).getName().toUpperCase());
-            }
-
-            Map<String, String> map = metaMgr.getTableDescExd(tableDesc.getIdentity());
-
-            if (map == null) {
-                map = Maps.newHashMap();
-            }
-            map.put(MetadataConstants.TABLE_EXD_TABLENAME, table.getTableName());
-            map.put(MetadataConstants.TABLE_EXD_LOCATION, table.getSd().getLocation());
-            map.put(MetadataConstants.TABLE_EXD_IF, table.getSd().getInputFormat());
-            map.put(MetadataConstants.TABLE_EXD_OF, table.getSd().getOutputFormat());
-            map.put(MetadataConstants.TABLE_EXD_OWNER, table.getOwner());
-            map.put(MetadataConstants.TABLE_EXD_LAT, String.valueOf(table.getLastAccessTime()));
-            map.put(MetadataConstants.TABLE_EXD_PC, partitionColumnString.toString());
-            map.put(MetadataConstants.TABLE_EXD_TFS, String.valueOf(tableSize));
-            map.put(MetadataConstants.TABLE_EXD_TNF, String.valueOf(tableFileNum));
-            map.put(MetadataConstants.TABLE_EXD_PARTITIONED, Boolean.valueOf(partitionFields != null && partitionFields.size() > 0).toString());
-
-            metaMgr.saveSourceTable(tableDesc);
-            metaMgr.saveTableExd(tableDesc.getIdentity(), map);
-            loadedTables.add(tableDesc.getIdentity());
-        }
-
-
-        return loadedTables;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/ITableSource.java b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
new file mode 100644
index 0000000..b2cd3b8
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface ITableSource {
+
+    public <I> I adaptToBuildEngine(Class<I> engineInterface);
+    
+    public ReadableTable createReadableTable(TableDesc tableDesc);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
new file mode 100644
index 0000000..19c7790
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+
+/**
+ */
+public interface ReadableTable {
+
+    /** Returns a reader to read the table. */
+    public TableReader getReader() throws IOException;
+
+    /** Used to detect table modifications mainly. Return null in case table does not exist. */
+    public TableSignature getSignature() throws IOException;
+
+    public interface TableReader extends Closeable {
+
+        /** Move to the next row, return false if no more record. */
+        public boolean next() throws IOException;
+
+        /** Get the current row. */
+        public String[] getRow();
+        
+    }
+    
+    // ============================================================================
+    
+    @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+    public class TableSignature {
+
+        @JsonProperty("path")
+        private String path;
+        @JsonProperty("size")
+        private long size;
+        @JsonProperty("last_modified_time")
+        private long lastModifiedTime;
+
+        // for JSON serialization
+        public TableSignature() {
+        }
+
+        public TableSignature(String path, long size, long lastModifiedTime) {
+            super();
+            this.path = path;
+            this.size = size;
+            this.lastModifiedTime = lastModifiedTime;
+        }
+
+        public void setPath(String path) {
+            this.path = path;
+        }
+
+        public void setSize(long size) {
+            this.size = size;
+        }
+
+        public void setLastModifiedTime(long lastModifiedTime) {
+            this.lastModifiedTime = lastModifiedTime;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public long getLastModifiedTime() {
+            return lastModifiedTime;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32));
+            result = prime * result + ((path == null) ? 0 : path.hashCode());
+            result = prime * result + (int) (size ^ (size >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            TableSignature other = (TableSignature) obj;
+            if (lastModifiedTime != other.lastModifiedTime)
+                return false;
+            if (path == null) {
+                if (other.path != null)
+                    return false;
+            } else if (!path.equals(other.path))
+                return false;
+            if (size != other.size)
+                return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]";
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
new file mode 100644
index 0000000..ec75c44
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.metadata.model.TableDesc;
+
+public class TableSourceFactory {
+
+    private static ITableSource dft = (ITableSource) ClassUtil.newInstance("org.apache.kylin.source.hive.HiveTableSource");
+    
+    public static ReadableTable createReadableTable(TableDesc table) {
+        return dft.createReadableTable(table);
+    }
+    
+    public static <T> T createEngineAdapter(TableDesc table, Class<T> engineInterface) {
+        return dft.adaptToBuildEngine(engineInterface);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java b/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
deleted file mode 100644
index f357855..0000000
--- a/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.tool;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.metadata.util.HiveSourceTableLoader;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
-
-    @Before
-    public void setup() throws Exception {
-        super.createTestMetadata();
-    }
-
-    @After
-    public  void after() throws Exception {
-        super.cleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws IOException {
-        if (!useSandbox())
-            return;
-
-        KylinConfig config = getTestConfig();
-        String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
-        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
-
-        assertTrue(loaded.size() == toLoad.length);
-        for (String str : toLoad)
-            assertTrue(loaded.contains(str));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 92ea678..6dc9fb7 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -56,13 +56,13 @@ import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.metadata.util.HiveSourceTableLoader;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.MetricsRequest;
 import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.response.MetricsResponse;
 import org.apache.kylin.rest.security.AclPermission;
+import org.apache.kylin.source.hive.HiveSourceTableLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;