You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/16 05:27:48 UTC
[08/11] incubator-kylin git commit: KYLIN-877 Abstract Input Source
interface and make hive a default impl
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
new file mode 100644
index 0000000..975d69f
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.*;
+
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
+import org.apache.kylin.source.TableSourceFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ *
+ */
+public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
+
+ SnapshotManager snapshotMgr;
+
+ @Before
+ public void setup() throws Exception {
+ createTestMetadata();
+ snapshotMgr = SnapshotManager.getInstance(getTestConfig());
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void basicTest() throws Exception {
+ String tableName = "EDW.TEST_SITES";
+ TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
+ ReadableTable hiveTable = TableSourceFactory.createReadableTable(tableDesc);
+ String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
+
+ snapshotMgr.wipeoutCache();
+
+ SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
+
+ // compare hive & snapshot
+ TableReader hiveReader = hiveTable.getReader();
+ TableReader snapshotReader = snapshot.getReader();
+
+ while (true) {
+ boolean hiveNext = hiveReader.next();
+ boolean snapshotNext = snapshotReader.next();
+ assertEquals(hiveNext, snapshotNext);
+
+ if (hiveNext == false)
+ break;
+
+ String[] hiveRow = hiveReader.getRow();
+ String[] snapshotRow = snapshotReader.getRow();
+ assertArrayEquals(hiveRow, snapshotRow);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index e1ab0af..0124b9b 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -24,7 +24,7 @@ import java.util.List;
*/
public interface IJoinedFlatTableDesc {
- public String getTableName(String jobUUID);
+ public String getTableName();
public List<IntermediateColumnDesc> getColumnList();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java b/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
deleted file mode 100644
index fe5c2b3..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.util;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.HiveClient;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * Management class to sync hive table metadata with command See main method for
- * how to use the class
- *
- * @author jianliu
- */
-public class HiveSourceTableLoader {
-
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
-
- public static final String OUTPUT_SURFIX = "json";
- public static final String TABLE_FOLDER_NAME = "table";
- public static final String TABLE_EXD_FOLDER_NAME = "table_exd";
-
- public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
-
- Map<String, Set<String>> db2tables = Maps.newHashMap();
- for (String table : hiveTables) {
- String[] parts = HadoopUtil.parseHiveTableName(table);
- Set<String> set = db2tables.get(parts[0]);
- if (set == null) {
- set = Sets.newHashSet();
- db2tables.put(parts[0], set);
- }
- set.add(parts[1]);
- }
-
- // extract from hive
- Set<String> loadedTables = Sets.newHashSet();
- for (String database : db2tables.keySet()) {
- List<String> loaded = extractHiveTables(database, db2tables.get(database), config);
- loadedTables.addAll(loaded);
- }
-
- return loadedTables;
- }
-
- private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException {
-
- List<String> loadedTables = Lists.newArrayList();
- MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
- for (String tableName : tables) {
- Table table = null;
- HiveClient hiveClient = new HiveClient();
- List<FieldSchema> partitionFields = null;
- List<FieldSchema> fields = null;
- try {
- table = hiveClient.getHiveTable(database, tableName);
- partitionFields = table.getPartitionKeys();
- fields = hiveClient.getHiveTableFields(database, tableName);
- } catch (Exception e) {
- e.printStackTrace();
- throw new IOException(e);
- }
-
- if (fields != null && partitionFields != null && partitionFields.size() > 0) {
- fields.addAll(partitionFields);
- }
-
- long tableSize = hiveClient.getFileSizeForTable(table);
- long tableFileNum = hiveClient.getFileNumberForTable(table);
- TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
- if (tableDesc == null) {
- tableDesc = new TableDesc();
- tableDesc.setDatabase(database.toUpperCase());
- tableDesc.setName(tableName.toUpperCase());
- tableDesc.setUuid(UUID.randomUUID().toString());
- tableDesc.setLastModified(0);
- }
-
- int columnNumber = fields.size();
- List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
- for (int i = 0; i < columnNumber; i++) {
- FieldSchema field = fields.get(i);
- ColumnDesc cdesc = new ColumnDesc();
- cdesc.setName(field.getName().toUpperCase());
- cdesc.setDatatype(field.getType());
- cdesc.setId(String.valueOf(i + 1));
- columns.add(cdesc);
- }
- tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
-
- StringBuffer partitionColumnString = new StringBuffer();
- for (int i = 0, n = partitionFields.size(); i < n; i++) {
- if (i > 0)
- partitionColumnString.append(", ");
- partitionColumnString.append(partitionFields.get(i).getName().toUpperCase());
- }
-
- Map<String, String> map = metaMgr.getTableDescExd(tableDesc.getIdentity());
-
- if (map == null) {
- map = Maps.newHashMap();
- }
- map.put(MetadataConstants.TABLE_EXD_TABLENAME, table.getTableName());
- map.put(MetadataConstants.TABLE_EXD_LOCATION, table.getSd().getLocation());
- map.put(MetadataConstants.TABLE_EXD_IF, table.getSd().getInputFormat());
- map.put(MetadataConstants.TABLE_EXD_OF, table.getSd().getOutputFormat());
- map.put(MetadataConstants.TABLE_EXD_OWNER, table.getOwner());
- map.put(MetadataConstants.TABLE_EXD_LAT, String.valueOf(table.getLastAccessTime()));
- map.put(MetadataConstants.TABLE_EXD_PC, partitionColumnString.toString());
- map.put(MetadataConstants.TABLE_EXD_TFS, String.valueOf(tableSize));
- map.put(MetadataConstants.TABLE_EXD_TNF, String.valueOf(tableFileNum));
- map.put(MetadataConstants.TABLE_EXD_PARTITIONED, Boolean.valueOf(partitionFields != null && partitionFields.size() > 0).toString());
-
- metaMgr.saveSourceTable(tableDesc);
- metaMgr.saveTableExd(tableDesc.getIdentity(), map);
- loadedTables.add(tableDesc.getIdentity());
- }
-
-
- return loadedTables;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/ITableSource.java b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
new file mode 100644
index 0000000..b2cd3b8
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface ITableSource {
+
+ public <I> I adaptToBuildEngine(Class<I> engineInterface);
+
+ public ReadableTable createReadableTable(TableDesc tableDesc);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
new file mode 100644
index 0000000..19c7790
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+
+/**
+ */
+public interface ReadableTable {
+
+ /** Returns a reader to read the table. */
+ public TableReader getReader() throws IOException;
+
+ /** Used to detect table modifications mainly. Return null in case table does not exist. */
+ public TableSignature getSignature() throws IOException;
+
+ public interface TableReader extends Closeable {
+
+ /** Move to the next row, return false if no more record. */
+ public boolean next() throws IOException;
+
+ /** Get the current row. */
+ public String[] getRow();
+
+ }
+
+ // ============================================================================
+
+ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+ public class TableSignature {
+
+ @JsonProperty("path")
+ private String path;
+ @JsonProperty("size")
+ private long size;
+ @JsonProperty("last_modified_time")
+ private long lastModifiedTime;
+
+ // for JSON serialization
+ public TableSignature() {
+ }
+
+ public TableSignature(String path, long size, long lastModifiedTime) {
+ super();
+ this.path = path;
+ this.size = size;
+ this.lastModifiedTime = lastModifiedTime;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+ public void setLastModifiedTime(long lastModifiedTime) {
+ this.lastModifiedTime = lastModifiedTime;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public long getLastModifiedTime() {
+ return lastModifiedTime;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32));
+ result = prime * result + ((path == null) ? 0 : path.hashCode());
+ result = prime * result + (int) (size ^ (size >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TableSignature other = (TableSignature) obj;
+ if (lastModifiedTime != other.lastModifiedTime)
+ return false;
+ if (path == null) {
+ if (other.path != null)
+ return false;
+ } else if (!path.equals(other.path))
+ return false;
+ if (size != other.size)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]";
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
new file mode 100644
index 0000000..ec75c44
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.metadata.model.TableDesc;
+
+public class TableSourceFactory {
+
+ private static ITableSource dft = (ITableSource) ClassUtil.newInstance("org.apache.kylin.source.hive.HiveTableSource");
+
+ public static ReadableTable createReadableTable(TableDesc table) {
+ return dft.createReadableTable(table);
+ }
+
+ public static <T> T createEngineAdapter(TableDesc table, Class<T> engineInterface) {
+ return dft.adaptToBuildEngine(engineInterface);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java b/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
deleted file mode 100644
index f357855..0000000
--- a/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.tool;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.metadata.util.HiveSourceTableLoader;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
-
- @Before
- public void setup() throws Exception {
- super.createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- super.cleanupTestMetadata();
- }
-
- @Test
- public void test() throws IOException {
- if (!useSandbox())
- return;
-
- KylinConfig config = getTestConfig();
- String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
- Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
-
- assertTrue(loaded.size() == toLoad.length);
- for (String str : toLoad)
- assertTrue(loaded.contains(str));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 92ea678..6dc9fb7 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -56,13 +56,13 @@ import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.project.RealizationEntry;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.metadata.util.HiveSourceTableLoader;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.request.MetricsRequest;
import org.apache.kylin.rest.response.HBaseResponse;
import org.apache.kylin.rest.response.MetricsResponse;
import org.apache.kylin.rest.security.AclPermission;
+import org.apache.kylin.source.hive.HiveSourceTableLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;