You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/15 08:33:44 UTC
[1/3] incubator-kylin git commit: KYLIN-877 Abstract Input Source
interface and make hive a default impl
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 111d8d206 -> d109e27bb
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
new file mode 100644
index 0000000..975d69f
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.*;
+
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
+import org.apache.kylin.source.TableSourceFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ *
+ */
+public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
+
+ SnapshotManager snapshotMgr;
+
+ @Before
+ public void setup() throws Exception {
+ createTestMetadata();
+ snapshotMgr = SnapshotManager.getInstance(getTestConfig());
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void basicTest() throws Exception {
+ String tableName = "EDW.TEST_SITES";
+ TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
+ ReadableTable hiveTable = TableSourceFactory.createReadableTable(tableDesc);
+ String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
+
+ snapshotMgr.wipeoutCache();
+
+ SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
+
+ // compare hive & snapshot
+ TableReader hiveReader = hiveTable.getReader();
+ TableReader snapshotReader = snapshot.getReader();
+
+ while (true) {
+ boolean hiveNext = hiveReader.next();
+ boolean snapshotNext = snapshotReader.next();
+ assertEquals(hiveNext, snapshotNext);
+
+ if (hiveNext == false)
+ break;
+
+ String[] hiveRow = hiveReader.getRow();
+ String[] snapshotRow = snapshotReader.getRow();
+ assertArrayEquals(hiveRow, snapshotRow);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index e1ab0af..0124b9b 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -24,7 +24,7 @@ import java.util.List;
*/
public interface IJoinedFlatTableDesc {
- public String getTableName(String jobUUID);
+ public String getTableName();
public List<IntermediateColumnDesc> getColumnList();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java b/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
deleted file mode 100644
index fe5c2b3..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.util;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.HiveClient;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * Management class to sync hive table metadata with command See main method for
- * how to use the class
- *
- * @author jianliu
- */
-public class HiveSourceTableLoader {
-
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
-
- public static final String OUTPUT_SURFIX = "json";
- public static final String TABLE_FOLDER_NAME = "table";
- public static final String TABLE_EXD_FOLDER_NAME = "table_exd";
-
- public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
-
- Map<String, Set<String>> db2tables = Maps.newHashMap();
- for (String table : hiveTables) {
- String[] parts = HadoopUtil.parseHiveTableName(table);
- Set<String> set = db2tables.get(parts[0]);
- if (set == null) {
- set = Sets.newHashSet();
- db2tables.put(parts[0], set);
- }
- set.add(parts[1]);
- }
-
- // extract from hive
- Set<String> loadedTables = Sets.newHashSet();
- for (String database : db2tables.keySet()) {
- List<String> loaded = extractHiveTables(database, db2tables.get(database), config);
- loadedTables.addAll(loaded);
- }
-
- return loadedTables;
- }
-
- private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException {
-
- List<String> loadedTables = Lists.newArrayList();
- MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
- for (String tableName : tables) {
- Table table = null;
- HiveClient hiveClient = new HiveClient();
- List<FieldSchema> partitionFields = null;
- List<FieldSchema> fields = null;
- try {
- table = hiveClient.getHiveTable(database, tableName);
- partitionFields = table.getPartitionKeys();
- fields = hiveClient.getHiveTableFields(database, tableName);
- } catch (Exception e) {
- e.printStackTrace();
- throw new IOException(e);
- }
-
- if (fields != null && partitionFields != null && partitionFields.size() > 0) {
- fields.addAll(partitionFields);
- }
-
- long tableSize = hiveClient.getFileSizeForTable(table);
- long tableFileNum = hiveClient.getFileNumberForTable(table);
- TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
- if (tableDesc == null) {
- tableDesc = new TableDesc();
- tableDesc.setDatabase(database.toUpperCase());
- tableDesc.setName(tableName.toUpperCase());
- tableDesc.setUuid(UUID.randomUUID().toString());
- tableDesc.setLastModified(0);
- }
-
- int columnNumber = fields.size();
- List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
- for (int i = 0; i < columnNumber; i++) {
- FieldSchema field = fields.get(i);
- ColumnDesc cdesc = new ColumnDesc();
- cdesc.setName(field.getName().toUpperCase());
- cdesc.setDatatype(field.getType());
- cdesc.setId(String.valueOf(i + 1));
- columns.add(cdesc);
- }
- tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
-
- StringBuffer partitionColumnString = new StringBuffer();
- for (int i = 0, n = partitionFields.size(); i < n; i++) {
- if (i > 0)
- partitionColumnString.append(", ");
- partitionColumnString.append(partitionFields.get(i).getName().toUpperCase());
- }
-
- Map<String, String> map = metaMgr.getTableDescExd(tableDesc.getIdentity());
-
- if (map == null) {
- map = Maps.newHashMap();
- }
- map.put(MetadataConstants.TABLE_EXD_TABLENAME, table.getTableName());
- map.put(MetadataConstants.TABLE_EXD_LOCATION, table.getSd().getLocation());
- map.put(MetadataConstants.TABLE_EXD_IF, table.getSd().getInputFormat());
- map.put(MetadataConstants.TABLE_EXD_OF, table.getSd().getOutputFormat());
- map.put(MetadataConstants.TABLE_EXD_OWNER, table.getOwner());
- map.put(MetadataConstants.TABLE_EXD_LAT, String.valueOf(table.getLastAccessTime()));
- map.put(MetadataConstants.TABLE_EXD_PC, partitionColumnString.toString());
- map.put(MetadataConstants.TABLE_EXD_TFS, String.valueOf(tableSize));
- map.put(MetadataConstants.TABLE_EXD_TNF, String.valueOf(tableFileNum));
- map.put(MetadataConstants.TABLE_EXD_PARTITIONED, Boolean.valueOf(partitionFields != null && partitionFields.size() > 0).toString());
-
- metaMgr.saveSourceTable(tableDesc);
- metaMgr.saveTableExd(tableDesc.getIdentity(), map);
- loadedTables.add(tableDesc.getIdentity());
- }
-
-
- return loadedTables;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/ITableSource.java b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
new file mode 100644
index 0000000..b2cd3b8
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface ITableSource {
+
+ public <I> I adaptToBuildEngine(Class<I> engineInterface);
+
+ public ReadableTable createReadableTable(TableDesc tableDesc);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
new file mode 100644
index 0000000..19c7790
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+
+/**
+ */
+public interface ReadableTable {
+
+ /** Returns a reader to read the table. */
+ public TableReader getReader() throws IOException;
+
+ /** Used to detect table modifications mainly. Return null in case table does not exist. */
+ public TableSignature getSignature() throws IOException;
+
+ public interface TableReader extends Closeable {
+
+ /** Move to the next row, return false if no more record. */
+ public boolean next() throws IOException;
+
+ /** Get the current row. */
+ public String[] getRow();
+
+ }
+
+ // ============================================================================
+
+ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+ public class TableSignature {
+
+ @JsonProperty("path")
+ private String path;
+ @JsonProperty("size")
+ private long size;
+ @JsonProperty("last_modified_time")
+ private long lastModifiedTime;
+
+ // for JSON serialization
+ public TableSignature() {
+ }
+
+ public TableSignature(String path, long size, long lastModifiedTime) {
+ super();
+ this.path = path;
+ this.size = size;
+ this.lastModifiedTime = lastModifiedTime;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+ public void setLastModifiedTime(long lastModifiedTime) {
+ this.lastModifiedTime = lastModifiedTime;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public long getLastModifiedTime() {
+ return lastModifiedTime;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32));
+ result = prime * result + ((path == null) ? 0 : path.hashCode());
+ result = prime * result + (int) (size ^ (size >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TableSignature other = (TableSignature) obj;
+ if (lastModifiedTime != other.lastModifiedTime)
+ return false;
+ if (path == null) {
+ if (other.path != null)
+ return false;
+ } else if (!path.equals(other.path))
+ return false;
+ if (size != other.size)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]";
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
new file mode 100644
index 0000000..ec75c44
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.metadata.model.TableDesc;
+
+public class TableSourceFactory {
+
+ private static ITableSource dft = (ITableSource) ClassUtil.newInstance("org.apache.kylin.source.hive.HiveTableSource");
+
+ public static ReadableTable createReadableTable(TableDesc table) {
+ return dft.createReadableTable(table);
+ }
+
+ public static <T> T createEngineAdapter(TableDesc table, Class<T> engineInterface) {
+ return dft.adaptToBuildEngine(engineInterface);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java b/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
deleted file mode 100644
index f357855..0000000
--- a/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.tool;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.metadata.util.HiveSourceTableLoader;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
-
- @Before
- public void setup() throws Exception {
- super.createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- super.cleanupTestMetadata();
- }
-
- @Test
- public void test() throws IOException {
- if (!useSandbox())
- return;
-
- KylinConfig config = getTestConfig();
- String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
- Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
-
- assertTrue(loaded.size() == toLoad.length);
- for (String str : toLoad)
- assertTrue(loaded.contains(str));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 92ea678..6dc9fb7 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -56,13 +56,13 @@ import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.project.RealizationEntry;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.metadata.util.HiveSourceTableLoader;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.request.MetricsRequest;
import org.apache.kylin.rest.response.HBaseResponse;
import org.apache.kylin.rest.response.MetricsResponse;
import org.apache.kylin.rest.security.AclPermission;
+import org.apache.kylin.source.hive.HiveSourceTableLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
[3/3] incubator-kylin git commit: KYLIN-877 Abstract Input Source
interface and make hive a default impl
Posted by li...@apache.org.
KYLIN-877 Abstract Input Source interface and make hive a default impl
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d109e27b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d109e27b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d109e27b
Branch: refs/heads/0.8
Commit: d109e27bb37f0dac6bdc318d472b1f739c7df794
Parents: 111d8d2
Author: Li, Yang <ya...@ebay.com>
Authored: Wed Jul 15 14:32:14 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Jul 15 14:33:11 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeBuilder.java | 2 -
.../java/org/apache/kylin/cube/CubeManager.java | 38 ++--
.../org/apache/kylin/cube/model/CubeDesc.java | 2 +-
.../cube/model/CubeJoinedFlatTableDesc.java | 5 +-
.../apache/kylin/dict/DictionaryGenerator.java | 4 +-
.../org/apache/kylin/dict/DictionaryInfo.java | 2 +-
.../apache/kylin/dict/DictionaryManager.java | 8 +-
.../org/apache/kylin/dict/lookup/FileTable.java | 1 +
.../kylin/dict/lookup/FileTableReader.java | 2 +-
.../org/apache/kylin/dict/lookup/HiveTable.java | 100 -----------
.../kylin/dict/lookup/HiveTableReader.java | 175 ------------------
.../kylin/dict/lookup/LookupBytesTable.java | 2 +-
.../kylin/dict/lookup/LookupStringTable.java | 1 +
.../apache/kylin/dict/lookup/LookupTable.java | 3 +-
.../apache/kylin/dict/lookup/ReadableTable.java | 133 --------------
.../apache/kylin/dict/lookup/SnapshotCLI.java | 3 +-
.../kylin/dict/lookup/SnapshotManager.java | 3 +-
.../apache/kylin/dict/lookup/SnapshotTable.java | 2 +
.../kylin/dict/ITHiveTableReaderTest.java | 51 ------
.../kylin/dict/ITSnapshotManagerTest.java | 81 ---------
.../model/IIJoinedFlatTableDesc.java | 4 +-
.../apache/kylin/engine/BuildEngineFactory.java | 1 -
.../kylin/engine/mr/BatchCubingJobBuilder.java | 23 +--
.../kylin/engine/mr/GarbageCollectionStep.java | 1 +
.../org/apache/kylin/engine/mr/IMRInput.java | 26 ++-
.../kylin/engine/mr/JobBuilderSupport.java | 36 +---
.../kylin/engine/mr/MRBatchCubingEngine.java | 27 +++
.../apache/kylin/job/AbstractJobBuilder.java | 64 -------
.../org/apache/kylin/job/JoinedFlatTable.java | 18 +-
.../kylin/job/hadoop/AbstractHadoopJob.java | 81 +++++----
.../cardinality/ColumnCardinalityMapper.java | 48 ++---
.../cardinality/HiveColumnCardinalityJob.java | 15 +-
.../job/hadoop/cube/FactDistinctColumnsJob.java | 25 ++-
.../cube/FactDistinctColumnsMapperBase.java | 23 ++-
.../cube/FactDistinctHiveColumnsMapper.java | 48 +++--
.../cube/FactDistinctIIColumnsMapper.java | 126 -------------
.../job/hadoop/cube/NewBaseCuboidMapper.java | 21 +--
.../kylin/job/hadoop/cubev2/InMemCuboidJob.java | 24 +--
.../job/hadoop/cubev2/InMemCuboidMapper.java | 17 +-
.../hadoop/invertedindex/IIFlattenHiveJob.java | 94 ----------
.../kylin/job/invertedindex/IIJobBuilder.java | 6 +-
.../kylin/job/streaming/CubeStreamConsumer.java | 2 +-
.../org/apache/kylin/source/ITableSource.java | 24 ---
.../apache/kylin/source/hive/HiveMRInput.java | 150 +++++++++++++++-
.../source/hive/HiveSourceTableLoader.java | 155 ++++++++++++++++
.../org/apache/kylin/source/hive/HiveTable.java | 100 +++++++++++
.../kylin/source/hive/HiveTableReader.java | 176 +++++++++++++++++++
.../kylin/source/hive/HiveTableSource.java | 7 +
.../apache/kylin/job/BuildIIWithStreamTest.java | 45 +++--
.../job/hadoop/cube/MergeCuboidMapperTest.java | 2 +-
.../job/hadoop/hive/JoinedFlatTableTest.java | 6 +-
.../kylin/job/hadoop/invertedindex/IITest.java | 67 +++----
.../job/tools/ColumnCardinalityJobTest.java | 63 -------
.../job/tools/ColumnCardinalityMapperTest.java | 126 -------------
.../job/tools/ColumnCardinalityReducerTest.java | 12 +-
.../hive/ITHiveSourceTableLoaderTest.java | 59 +++++++
.../source/hive/ITHiveTableReaderTest.java | 51 ++++++
.../source/hive/ITSnapshotManagerTest.java | 82 +++++++++
.../metadata/model/IJoinedFlatTableDesc.java | 2 +-
.../metadata/util/HiveSourceTableLoader.java | 155 ----------------
.../org/apache/kylin/source/ITableSource.java | 28 +++
.../org/apache/kylin/source/ReadableTable.java | 133 ++++++++++++++
.../apache/kylin/source/TableSourceFactory.java | 35 ++++
.../tool/ITHiveSourceTableLoaderTest.java | 59 -------
.../apache/kylin/rest/service/CubeService.java | 2 +-
65 files changed, 1308 insertions(+), 1579 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java b/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
index 538f9c9..a828252 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
@@ -20,8 +20,6 @@ package org.apache.kylin.cube;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import java.util.List;
-
/**
*/
public class CubeBuilder {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index ff4a0ae..90fade3 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -18,11 +18,19 @@
package org.apache.kylin.cube;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.Nullable;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
@@ -36,7 +44,6 @@ import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.HiveTable;
import org.apache.kylin.dict.lookup.LookupStringTable;
import org.apache.kylin.dict.lookup.SnapshotManager;
import org.apache.kylin.dict.lookup.SnapshotTable;
@@ -46,14 +53,21 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.realization.*;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.metadata.realization.IRealizationProvider;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.TableSourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
/**
* @author yangli9
@@ -192,8 +206,8 @@ public class CubeManager implements IRealizationProvider {
MetadataManager metaMgr = getMetadataManager();
SnapshotManager snapshotMgr = getSnapshotManager();
- HiveTable hiveTable = new HiveTable(metaMgr, lookupTable);
TableDesc tableDesc = metaMgr.getTableDesc(lookupTable);
+ ReadableTable hiveTable = TableSourceFactory.createReadableTable(tableDesc);
SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 24f9db4..56bb78f 100644
--- a/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -326,7 +326,7 @@ public class CubeDesc extends RootPersistentEntity {
public String getFactTable() {
return model.getFactTable().toUpperCase();
}
-
+
public String[] getNullStrings() {
return nullStrings;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 0a8b8ed..960ddca 100644
--- a/cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -11,7 +11,6 @@ import java.util.List;
import java.util.Map;
/**
- * @author George Song (ysong1)
*/
public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
@@ -127,8 +126,8 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
}
@Override
- public String getTableName(String jobUUID) {
- return tableName + "_" + jobUUID.replace("-", "_");
+ public String getTableName() {
+ return tableName;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 60b2a8d..d022885 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -31,9 +31,9 @@ import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index 802f6e7..2eca3ea 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
+import org.apache.kylin.source.ReadableTable.TableSignature;
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class DictionaryInfo extends RootPersistentEntity {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 2e1e3db..611251a 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -35,13 +35,13 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.dict.lookup.FileTable;
-import org.apache.kylin.dict.lookup.HiveTable;
-import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableSignature;
+import org.apache.kylin.source.TableSourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -272,7 +272,7 @@ public class DictionaryManager {
if (model.isFactTable(col.getTable())) {
table = new FileTable(factColumnsPath + "/" + col.getName(), -1);
} else {
- table = new HiveTable(metaMgr, col.getTable());
+ table = TableSourceFactory.createReadableTable(metaMgr.getTableDesc(col.getTable()));
}
}
// otherwise could refer to a data set, e.g. common_indicators.txt
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
index 92f0c10..59eca4a 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.source.ReadableTable;
/**
*/
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
index bf46963..4e04c93 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
@@ -39,7 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
+import org.apache.kylin.source.ReadableTable.TableReader;
/**
* Tables are typically CSV or SEQ file.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
deleted file mode 100644
index 68e9b82..0000000
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.IOException;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HiveClient;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class HiveTable implements ReadableTable {
-
- private static final Logger logger = LoggerFactory.getLogger(HiveTable.class);
-
- final private String database;
- final private String hiveTable;
-
- private HiveClient hiveClient;
-
- public HiveTable(MetadataManager metaMgr, String table) {
- TableDesc tableDesc = metaMgr.getTableDesc(table);
- this.database = tableDesc.getDatabase();
- this.hiveTable = tableDesc.getName();
- }
-
- @Override
- public TableReader getReader() throws IOException {
- return new HiveTableReader(database, hiveTable);
- }
-
- @Override
- public TableSignature getSignature() throws IOException {
- try {
- String path = computeHDFSLocation();
- Pair<Long, Long> sizeAndLastModified = FileTable.getSizeAndLastModified(path);
- long size = sizeAndLastModified.getFirst();
- long lastModified = sizeAndLastModified.getSecond();
-
- // for non-native hive table, cannot rely on size & last modified on HDFS
- if (getHiveClient().isNativeTable(database, hiveTable) == false) {
- lastModified = System.currentTimeMillis(); // assume table is ever changing
- }
-
- return new TableSignature(path, size, lastModified);
-
- } catch (Exception e) {
- if (e instanceof IOException)
- throw (IOException) e;
- else
- throw new IOException(e);
- }
- }
-
- private String computeHDFSLocation() throws Exception {
-
- String override = KylinConfig.getInstanceFromEnv().getOverrideHiveTableLocation(hiveTable);
- if (override != null) {
- logger.debug("Override hive table location " + hiveTable + " -- " + override);
- return override;
- }
-
- return getHiveClient().getHiveTableLocation(database, hiveTable);
- }
-
- public HiveClient getHiveClient() {
-
- if (hiveClient == null) {
- hiveClient = new HiveClient();
- }
- return hiveClient;
- }
-
- @Override
- public String toString() {
- return "hive: database=[" + database + "], table=[" + hiveTable + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
deleted file mode 100644
index 96aa0d1..0000000
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
-import org.apache.hive.hcatalog.data.transfer.HCatReader;
-import org.apache.hive.hcatalog.data.transfer.ReadEntity;
-import org.apache.hive.hcatalog.data.transfer.ReaderContext;
-import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
-
-/**
- * An implementation of TableReader with HCatalog for Hive table.
- */
-public class HiveTableReader implements TableReader {
-
- private String dbName;
- private String tableName;
- private int currentSplit = -1;
- private ReaderContext readCntxt = null;
- private Iterator<HCatRecord> currentHCatRecordItr = null;
- private HCatRecord currentHCatRecord;
- private int numberOfSplits = 0;
- private Map<String, String> partitionKV = null;
-
- /**
- * Constructor for reading whole hive table
- * @param dbName
- * @param tableName
- * @throws IOException
- */
- public HiveTableReader(String dbName, String tableName) throws IOException {
- this(dbName, tableName, null);
- }
-
- /**
- * Constructor for reading a partition of the hive table
- * @param dbName
- * @param tableName
- * @param partitionKV key-value pairs condition on the partition
- * @throws IOException
- */
- public HiveTableReader(String dbName, String tableName, Map<String, String> partitionKV) throws IOException {
- this.dbName = dbName;
- this.tableName = tableName;
- this.partitionKV = partitionKV;
- initialize();
- }
-
- private void initialize() throws IOException {
- try {
- this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV);
- } catch (Exception e) {
- e.printStackTrace();
- throw new IOException(e);
- }
-
- this.numberOfSplits = readCntxt.numSplits();
-
-// HCatTableInfo tableInfo = HCatTableInfo.
-// HCatSchema schema = HCatBaseInputFormat.getTableSchema(context.getConfiguration);
- }
-
- @Override
- public boolean next() throws IOException {
-
- while (currentHCatRecordItr == null || !currentHCatRecordItr.hasNext()) {
- currentSplit++;
- if (currentSplit == numberOfSplits) {
- return false;
- }
-
- currentHCatRecordItr = loadHCatRecordItr(readCntxt, currentSplit);
- }
-
- currentHCatRecord = currentHCatRecordItr.next();
-
- return true;
- }
-
- @Override
- public String[] getRow() {
- return getRowAsStringArray(currentHCatRecord);
- }
-
- public List<String> getRowAsList() {
- return getRowAsList(currentHCatRecord);
- }
-
- public static List<String> getRowAsList(HCatRecord record) {
- List<String> rowValues = new ArrayList<String>(record.size());
- return getRowAsList(record, rowValues);
- }
-
- public static List<String> getRowAsList(HCatRecord record, List<String> rowValues) {
- List<Object> allFields = record.getAll();
- for (Object o : allFields) {
- rowValues.add(o != null ? o.toString() : null);
- }
-
- return rowValues;
- }
-
- public static String[] getRowAsStringArray(HCatRecord record) {
- List<String> row = getRowAsList(record);
-
- return row.toArray(new String[row.size()]);
- }
-
- @Override
- public void close() throws IOException {
- this.readCntxt = null;
- this.currentHCatRecordItr = null;
- this.currentHCatRecord = null;
- this.currentSplit = -1;
- }
-
- public String toString() {
- return "hive table reader for: " + dbName + "." + tableName;
- }
-
- private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
- HiveConf hiveConf = new HiveConf(HiveTableReader.class);
- Iterator<Entry<String, String>> itr = hiveConf.iterator();
- Map<String, String> map = new HashMap<String, String>();
- while (itr.hasNext()) {
- Entry<String, String> kv = itr.next();
- map.put(kv.getKey(), kv.getValue());
- }
-
- ReadEntity entity;
- if (partitionKV == null || partitionKV.size() == 0) {
- entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build();
- } else {
- entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build();
- }
-
- HCatReader reader = DataTransferFactory.getHCatReader(entity, map);
- ReaderContext cntxt = reader.prepareRead();
-
- return cntxt;
- }
-
- private static Iterator<HCatRecord> loadHCatRecordItr(ReaderContext readCntxt, int dataSplit) throws HCatException {
- HCatReader currentHCatReader = DataTransferFactory.getHCatReader(readCntxt, dataSplit);
-
- return currentHCatReader.read();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
index ee71e78..c5a75f5 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
@@ -21,9 +21,9 @@ package org.apache.kylin.dict.lookup;
import java.io.IOException;
import org.apache.kylin.common.util.Bytes;
-
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
/**
* @author yangli9
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
index f9b9779..2d92d68 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.dict.lookup;
import java.io.IOException;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
/**
* @author yangli9
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index 7e83197..fd0c37f 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -30,8 +30,9 @@ import org.apache.kylin.common.util.Pair;
import com.google.common.collect.Sets;
import org.apache.kylin.common.util.Array;
-import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
/**
* An in-memory lookup table, in which each cell is an object of type T. The
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
deleted file mode 100644
index 2e6af14..0000000
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-
-/**
- */
-public interface ReadableTable {
-
- /** Returns a reader to read the table. */
- public TableReader getReader() throws IOException;
-
- /** Used to detect table modifications mainly. Return null in case table does not exist. */
- public TableSignature getSignature() throws IOException;
-
- public interface TableReader extends Closeable {
-
- /** Move to the next row, return false if no more record. */
- public boolean next() throws IOException;
-
- /** Get the current row. */
- public String[] getRow();
-
- }
-
- // ============================================================================
-
- @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
- public class TableSignature {
-
- @JsonProperty("path")
- private String path;
- @JsonProperty("size")
- private long size;
- @JsonProperty("last_modified_time")
- private long lastModifiedTime;
-
- // for JSON serialization
- public TableSignature() {
- }
-
- public TableSignature(String path, long size, long lastModifiedTime) {
- super();
- this.path = path;
- this.size = size;
- this.lastModifiedTime = lastModifiedTime;
- }
-
- public void setPath(String path) {
- this.path = path;
- }
-
- public void setSize(long size) {
- this.size = size;
- }
-
- public void setLastModifiedTime(long lastModifiedTime) {
- this.lastModifiedTime = lastModifiedTime;
- }
-
- public String getPath() {
- return path;
- }
-
- public long getSize() {
- return size;
- }
-
- public long getLastModifiedTime() {
- return lastModifiedTime;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32));
- result = prime * result + ((path == null) ? 0 : path.hashCode());
- result = prime * result + (int) (size ^ (size >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TableSignature other = (TableSignature) obj;
- if (lastModifiedTime != other.lastModifiedTime)
- return false;
- if (path == null) {
- if (other.path != null)
- return false;
- } else if (!path.equals(other.path))
- return false;
- if (size != other.size)
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]";
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
index bb0f9ce..01c4fbd 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
@@ -5,6 +5,7 @@ import java.io.IOException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.TableSourceFactory;
public class SnapshotCLI {
@@ -22,7 +23,7 @@ public class SnapshotCLI {
if (tableDesc == null)
throw new IllegalArgumentException("Not table found by " + table);
- SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(new HiveTable(metaMgr, table), tableDesc, overwriteUUID);
+ SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(TableSourceFactory.createReadableTable(tableDesc), tableDesc, overwriteUUID);
System.out.println("resource path updated: " + snapshot.getResourcePath());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index e9d74bb..7822154 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -26,9 +26,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableSignature;
/**
* @author yangli9
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index aa46212..e2205b9 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.dict.lookup;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.persistence.ResourceStore;
@@ -30,6 +31,7 @@ import org.apache.kylin.dict.StringBytesConverter;
import org.apache.kylin.dict.TrieDictionary;
import org.apache.kylin.dict.TrieDictionaryBuilder;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
import java.io.DataInput;
import java.io.DataOutput;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/test/java/org/apache/kylin/dict/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/ITHiveTableReaderTest.java b/dictionary/src/test/java/org/apache/kylin/dict/ITHiveTableReaderTest.java
deleted file mode 100644
index 8559f8b..0000000
--- a/dictionary/src/test/java/org/apache/kylin/dict/ITHiveTableReaderTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.dict.lookup.HiveTableReader;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-
-/**
- * This test case need the hive runtime; Please run it with sandbox;
- * @author shaoshi
- *
- * It is in the exclude list of default profile in pom.xml
- */
-public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
-
-
- @Test
- public void test() throws IOException {
- HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact");
- int rowNumber = 0;
- while (reader.next()) {
- String[] row = reader.getRow();
- Assert.assertEquals(9, row.length);
- //System.out.println(ArrayUtils.toString(row));
- rowNumber++;
- }
-
- reader.close();
- Assert.assertEquals(10000, rowNumber);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java b/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
deleted file mode 100644
index 7fc37a6..0000000
--- a/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import static org.junit.Assert.*;
-
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.dict.lookup.HiveTable;
-import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author yangli9
- *
- */
-public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
-
- SnapshotManager snapshotMgr;
-
- @Before
- public void setup() throws Exception {
- createTestMetadata();
- snapshotMgr = SnapshotManager.getInstance(getTestConfig());
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-
- @Test
- public void basicTest() throws Exception {
- String tableName = "EDW.TEST_SITES";
- HiveTable hiveTable = new HiveTable(MetadataManager.getInstance(getTestConfig()), tableName);
- TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
- String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
-
- snapshotMgr.wipeoutCache();
-
- SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
-
- // compare hive & snapshot
- TableReader hiveReader = hiveTable.getReader();
- TableReader snapshotReader = snapshot.getReader();
-
- while (true) {
- boolean hiveNext = hiveReader.next();
- boolean snapshotNext = snapshotReader.next();
- assertEquals(hiveNext, snapshotNext);
-
- if (hiveNext == false)
- break;
-
- String[] hiveRow = hiveReader.getRow();
- String[] snapshotRow = snapshotReader.getRow();
- assertArrayEquals(hiveRow, snapshotRow);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
index b01ffa7..4ee1ccb 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
@@ -51,8 +51,8 @@ public class IIJoinedFlatTableDesc implements IJoinedFlatTableDesc {
}
@Override
- public String getTableName(String jobUUID) {
- return tableName + "_" + jobUUID.replace("-", "_");
+ public String getTableName() {
+ return tableName + "_" + "II_Flat";
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
index 0e31f6b..c536deb 100644
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -36,5 +36,4 @@ public class BuildEngineFactory {
return defaultBatch.createBatchMergeJob(mergeSegment, submitter);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 8f16775..c1524e7 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.job.common.HadoopShellExecutable;
import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -31,17 +32,12 @@ import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
public class BatchCubingJobBuilder extends JobBuilderSupport {
-
- public static interface IMRBuildInputParticipant {
-
- }
-
- public static interface IMRBuildOutputParticipant {
-
- }
+
+ private final IMRBatchCubingInputSide inputSide;
public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
super(newSegment, submitter);
+ this.inputSide = MRBatchCubingEngine.getBatchCubingInputSide(seg);
}
public CubingJob build() {
@@ -50,7 +46,8 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
final String cuboidRootPath = getCuboidRootPath(jobId);
final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
- result.addTask(createFlatHiveTableStep(flatHiveTableDesc, jobId));
+ // Phase 1: Create Flat Table
+ inputSide.addStepPhase1_CreateFlatTable(result);
result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
result.addTask(createBuildDictionaryStep(jobId));
@@ -85,9 +82,9 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
result.addTask(createBulkLoadStep(jobId));
}
+ // Phase 4: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
-
- result.addTask(createGarbageCollectionStep(null, flatHiveTableDesc.getTableName(jobId)));
+ inputSide.addStepPhase4_UpdateMetadataAndCleanup(result);
return result;
}
@@ -105,7 +102,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
- appendExecCmdParameters(cmd, "tablename", flatHiveTableDesc.getTableName(jobId));
result.setMapReduceParams(cmd.toString());
return result;
@@ -192,7 +188,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "level", "0");
- appendExecCmdParameters(cmd, "tablename", flatHiveTableDesc.getTableName(jobId));
appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
baseCuboidStep.setMapReduceParams(cmd.toString());
@@ -211,7 +206,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
}
private String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
- return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName(jobId);
+ return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
}
private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
index 2f7bf21..d79f35d 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
@@ -44,6 +44,7 @@ import com.google.common.collect.Lists;
/**
* Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
*/
+@Deprecated // only exists for backward compatibility
public class GarbageCollectionStep extends AbstractExecutable {
private static final String OLD_HTABLES = "oldHTables";
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 3097a4b..c170b47 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -18,8 +18,30 @@
package org.apache.kylin.engine.mr;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.TableDesc;
+
public interface IMRInput {
-
- public IMRJobFlowParticipant createBuildFlowParticipant();
+ public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
+
+ public IMRTableInputFormat getTableInputFormat(TableDesc table);
+
+ public interface IMRTableInputFormat {
+
+ public void configureJob(Job job);
+
+ public String[] parseMapperInput(Object mapperInput);
+ }
+
+ public interface IMRBatchCubingInputSide {
+
+ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
+
+ public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow);
+
+ public IMRTableInputFormat getFlatTableInputFormat();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index b1f3c4e..79430f6 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,18 +23,14 @@ import java.util.List;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.common.HadoopShellExecutable;
import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import com.google.common.base.Preconditions;
@@ -54,10 +50,6 @@ abstract public class JobBuilderSupport {
this.submitter = submitter;
}
- protected AbstractExecutable createFlatHiveTableStep(IJoinedFlatTableDesc flatTableDesc, String jobId) {
- return createFlatHiveTableStep(config, flatTableDesc, jobId);
- }
-
protected MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
@@ -174,35 +166,9 @@ abstract public class JobBuilderSupport {
}
// ============================================================================
- // static methods also shared by IIJobBuilder
+ // static methods also shared by other job flow participant
// ----------------------------------------------------------------------------
- public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) {
-
- final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc, jobId);
- final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, getJobWorkingDir(conf, jobId), jobId);
- String insertDataHqls;
- try {
- insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, jobId, conf);
- } catch (IOException e1) {
- e1.printStackTrace();
- throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
- }
-
- ShellExecutable step = new ShellExecutable();
- StringBuffer buf = new StringBuffer();
- buf.append("hive -e \"");
- buf.append(dropTableHql + "\n");
- buf.append(createTableHql + "\n");
- buf.append(insertDataHqls + "\n");
- buf.append("\"");
-
- step.setCmd(buf.toString());
- step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-
- return step;
- }
-
public static String getJobWorkingDir(JobEngineConfig conf, String jobId) {
return conf.getHdfsWorkingDirectory() + "/" + "kylin-" + jobId;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
index 332f20e..b374a99 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -18,9 +18,15 @@
package org.apache.kylin.engine.mr;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.TableSourceFactory;
public class MRBatchCubingEngine implements IBatchCubingEngine {
@@ -33,5 +39,26 @@ public class MRBatchCubingEngine implements IBatchCubingEngine {
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
return new BatchMergeJobBuilder(mergeSegment, submitter).build();
}
+
+ public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+ TableDesc tableDesc = getTableDesc(seg.getCubeDesc().getFactTable());
+ return getMRInput(tableDesc).getBatchCubingInputSide(seg);
+ }
+
+ public static IMRTableInputFormat getTableInputFormat(String tableName) {
+ return getTableInputFormat(getTableDesc(tableName));
+ }
+
+ public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
+ return getMRInput(tableDesc).getTableInputFormat(tableDesc);
+ }
+
+ private static IMRInput getMRInput(TableDesc tableDesc) {
+ return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class);
+ }
+
+ private static TableDesc getTableDesc(String tableName) {
+ return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
deleted file mode 100644
index 48766b7..0000000
--- a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.io.IOException;
-
-import org.apache.kylin.job.common.ShellExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-public abstract class AbstractJobBuilder {
-
- protected static final String JOB_WORKING_DIR_PREFIX = "kylin-";
-
- protected JobEngineConfig engineConfig;
- protected String submitter;
-
- public AbstractJobBuilder(JobEngineConfig engineConfig) {
- this.engineConfig = engineConfig;
- }
-
- public AbstractJobBuilder setSubmitter(String submitter) {
- this.submitter = submitter;
- return this;
- }
-
- public String getSubmitter() {
- return submitter;
- }
-
- protected StringBuilder appendExecCmdParameters(StringBuilder cmd, String paraName, String paraValue) {
- return cmd.append(" -").append(paraName).append(" ").append(paraValue);
- }
-
- protected String getIntermediateHiveTableName(IJoinedFlatTableDesc intermediateTableDesc, String jobUuid) {
- return intermediateTableDesc.getTableName(jobUuid);
- }
-
- protected String getIntermediateHiveTableLocation(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID) {
- return getJobWorkingDir(jobUUID) + "/" + intermediateTableDesc.getTableName(jobUUID);
- }
-
- protected String getJobWorkingDir(String uuid) {
- return engineConfig.getHdfsWorkingDirectory() + "/" + JOB_WORKING_DIR_PREFIX + uuid;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 283f462..eba6bd4 100644
--- a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -56,14 +56,14 @@ public class JoinedFlatTable {
public static final String LOOKUP_TABLE_ALAIS_PREFIX = "LOOKUP_";
- public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir, String jobUUID) {
- return storageDfsDir + "/" + intermediateTableDesc.getTableName(jobUUID);
+ public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+ return storageDfsDir + "/" + intermediateTableDesc.getTableName();
}
- public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir, String jobUUID) {
+ public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
StringBuilder ddl = new StringBuilder();
- ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName(jobUUID) + "\n");
+ ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName() + "\n");
ddl.append("(" + "\n");
for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
@@ -77,19 +77,19 @@ public class JoinedFlatTable {
ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
ddl.append("STORED AS SEQUENCEFILE" + "\n");
- ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName(jobUUID) + "';").append("\n");
+ ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
// ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
// ";\n");
return ddl.toString();
}
- public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID) {
+ public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
StringBuilder ddl = new StringBuilder();
- ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName(jobUUID) + ";").append("\n");
+ ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName() + ";").append("\n");
return ddl.toString();
}
- public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID, JobEngineConfig engineConfig) throws IOException {
+ public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) throws IOException {
StringBuilder sql = new StringBuilder();
File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath());
@@ -117,7 +117,7 @@ public class JoinedFlatTable {
}
}
- sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName(jobUUID) + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
+ sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
return sql.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 919af67..7d69fd9 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -232,52 +232,44 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
- protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
- File tmp = File.createTempFile("kylin_job_meta", "");
- tmp.delete(); // we need a directory, so delete the file first
-
- File metaDir = new File(tmp, "meta");
- metaDir.mkdirs();
- metaDir.getParentFile().deleteOnExit();
-
- // write kylin.properties
+ public static KylinConfig loadKylinPropsAndMetadata() throws IOException {
+ File metaDir = new File("meta");
+ System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
+ logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- File kylinPropsFile = new File(metaDir, "kylin.properties");
- kylinConfig.writeProperties(kylinPropsFile);
+ kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
+ return kylinConfig;
+ }
+ protected void attachKylinPropsAndMetadata(TableDesc table, Configuration conf) throws IOException {
+ ArrayList<String> dumpList = new ArrayList<String>();
+ dumpList.add(table.getResourcePath());
+ attachKylinPropsAndMetadata(dumpList, conf);
+ }
+
+ protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
+ MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+
// write cube / model_desc / cube_desc / dict / table
ArrayList<String> dumpList = new ArrayList<String>();
dumpList.add(cube.getResourcePath());
dumpList.add(cube.getDescriptor().getModel().getResourcePath());
dumpList.add(cube.getDescriptor().getResourcePath());
+
for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
- TableDesc table = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
+ TableDesc table = metaMgr.getTableDesc(tableName);
dumpList.add(table.getResourcePath());
}
-
for (CubeSegment segment : cube.getSegments()) {
dumpList.addAll(segment.getDictionaryPaths());
}
-
- dumpResources(kylinConfig, metaDir, dumpList);
-
- // hadoop distributed cache
- conf.set("tmpfiles", "file:///" + OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()));
+
+ attachKylinPropsAndMetadata(dumpList, conf);
}
protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
- File tmp = File.createTempFile("kylin_job_meta", "");
- tmp.delete(); // we need a directory, so delete the file first
-
- File metaDir = new File(tmp, "meta");
- metaDir.mkdirs();
- metaDir.getParentFile().deleteOnExit();
-
- // write kylin.properties
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- File kylinPropsFile = new File(metaDir, "kylin.properties");
- kylinConfig.writeProperties(kylinPropsFile);
-
+ MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+
// write II / model_desc / II_desc / dict / table
ArrayList<String> dumpList = new ArrayList<String>();
dumpList.add(ii.getResourcePath());
@@ -285,14 +277,30 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
dumpList.add(ii.getDescriptor().getResourcePath());
for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
- TableDesc table = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
+ TableDesc table = metaMgr.getTableDesc(tableName);
dumpList.add(table.getResourcePath());
}
-
for (IISegment segment : ii.getSegments()) {
dumpList.addAll(segment.getDictionaryPaths());
}
+ attachKylinPropsAndMetadata(dumpList, conf);
+ }
+
+ private void attachKylinPropsAndMetadata(ArrayList<String> dumpList, Configuration conf) throws IOException {
+ File tmp = File.createTempFile("kylin_job_meta", "");
+ tmp.delete(); // we need a directory, so delete the file first
+
+ File metaDir = new File(tmp, "meta");
+ metaDir.mkdirs();
+ metaDir.getParentFile().deleteOnExit();
+
+ // write kylin.properties
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ File kylinPropsFile = new File(metaDir, "kylin.properties");
+ kylinConfig.writeProperties(kylinPropsFile);
+
+ // write resources
dumpResources(kylinConfig, metaDir, dumpList);
// hadoop distributed cache
@@ -346,15 +354,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
return input.getSplits(job).size();
}
- public static KylinConfig loadKylinPropsAndMetadata() throws IOException {
- File metaDir = new File("meta");
- System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
- logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
- return kylinConfig;
- }
-
public void kill() throws JobException {
if (job != null) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
index b32baf9..8da6cde 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -24,52 +24,60 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.mr.KylinMapper;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
/**
* @author Jack
*
*/
-public class ColumnCardinalityMapper<T> extends KylinMapper<T, HCatRecord, IntWritable, BytesWritable> {
+public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritable, BytesWritable> {
private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
public static final String DEFAULT_DELIM = ",";
private int counter = 0;
- private HCatSchema schema = null;
- private int columnSize = 0;
+ private TableDesc tableDesc;
+ private IMRTableInputFormat tableInputFormat;
@Override
protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
- schema = HCatInputFormat.getTableSchema(context.getConfiguration());
- columnSize = schema.getFields().size();
+ Configuration conf = context.getConfiguration();
+ bindCurrentConfiguration(conf);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ String tableName = conf.get(BatchConstants.TABLE_NAME);
+ tableDesc = MetadataManager.getInstance(config).getTableDesc(tableName);
+ tableInputFormat = MRBatchCubingEngine.getTableInputFormat(tableDesc);
}
@Override
- public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException {
-
- HCatFieldSchema field;
- Object fieldValue;
- for (int m = 0; m < columnSize; m++) {
- field = schema.get(m);
- fieldValue = value.get(field.getName(), schema);
+ public void map(T key, Object value, Context context) throws IOException, InterruptedException {
+ ColumnDesc[] columns = tableDesc.getColumns();
+ String[] values = tableInputFormat.parseMapperInput(value);
+
+ for (int m = 0; m < columns.length; m++) {
+ String field = columns[m].getName();
+ String fieldValue = values[m];
if (fieldValue == null)
fieldValue = "NULL";
if (counter < 5 && m < 10) {
- System.out.println("Get row " + counter + " column '" + field.getName() + "' value: " + fieldValue);
+ System.out.println("Get row " + counter + " column '" + field + "' value: " + fieldValue);
}
if (fieldValue != null)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
index abf3e2f..58fd509 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-
-import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
/**
@@ -69,16 +69,17 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
setJobClasspath(job);
+ String table = getOptionValue(OPTION_TABLE);
+ job.getConfiguration().set(BatchConstants.TABLE_NAME, table);
+
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set("dfs.block.size", "67108864");
// Mapper
- String table = getOptionValue(OPTION_TABLE);
- String[] dbTableNames = HadoopUtil.parseHiveTableName(table);
- HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
+ IMRTableInputFormat tableInputFormat = MRBatchCubingEngine.getTableInputFormat(table);
+ tableInputFormat.configureJob(job);
- job.setInputFormatClass(HCatInputFormat.class);
job.setMapperClass(ColumnCardinalityMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index 5ec963e..77c4f7e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -29,18 +29,19 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * @author yangli9
*/
public class FactDistinctColumnsJob extends AbstractHadoopJob {
protected static final Logger log = LoggerFactory.getLogger(FactDistinctColumnsJob.class);
@@ -53,7 +54,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_TABLE_NAME);
options.addOption(OPTION_SEGMENT_NAME);
options.addOption(OPTION_STATISTICS_ENABLED);
options.addOption(OPTION_STATISTICS_OUTPUT);
@@ -63,8 +63,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
String cubeName = getOptionValue(OPTION_CUBE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
- ;
+
String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
@@ -73,7 +72,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
// ----------------------------------------------------------------------------
// add metadata to distributed cache
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
+ CubeInstance cube = cubeMgr.getCube(cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
@@ -84,11 +83,11 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
setJobClasspath(job);
- setupMapper(intermediateTable);
+ setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
setupReducer(output);
// CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment();
- attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration());
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
return waitForCompletion(job);
@@ -100,12 +99,10 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
}
- private void setupMapper(String intermediateTable) throws IOException {
- String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
- HCatInputFormat.setInput(job, dbTableNames[0],
- dbTableNames[1]);
+ private void setupMapper(CubeSegment cubeSeg) throws IOException {
+ IMRTableInputFormat flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ flatTableInputFormat.configureJob(job);
- job.setInputFormatClass(HCatInputFormat.class);
job.setMapperClass(FactDistinctHiveColumnsMapper.class);
job.setCombinerClass(FactDistinctColumnsCombiner.class);
job.setMapOutputKeyClass(LongWritable.class);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
index 6bd25a8..ee8ab3e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
@@ -1,35 +1,41 @@
package org.apache.kylin.job.hadoop.cube;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.mr.KylinMapper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
/**
*/
public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, LongWritable, Text> {
protected String cubeName;
protected CubeInstance cube;
+ protected CubeSegment cubeSeg;
protected CubeDesc cubeDesc;
protected long baseCuboidId;
protected List<TblColRef> columns;
protected ArrayList<Integer> factDictCols;
+ protected IMRTableInputFormat flatTableInputFormat;
protected LongWritable outputKey = new LongWritable();
protected Text outputValue = new Text();
@@ -43,6 +49,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
cube = CubeManager.getInstance(config).getCube(cubeName);
+ cubeSeg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), SegmentStatusEnum.NEW);
cubeDesc = cube.getDescriptor();
baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
columns = Cuboid.findById(cubeDesc, baseCuboidId).getColumns();
@@ -60,11 +67,13 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
factDictCols.add(i);
}
}
+
+ flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
}
- protected void handleErrorRecord(HCatRecord record, Exception ex) throws IOException {
+ protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
- System.err.println("Insane record: " + record.getAll());
+ System.err.println("Insane record: " + Arrays.toString(record));
ex.printStackTrace(System.err);
errorRecordCounter++;
[2/3] incubator-kylin git commit: KYLIN-877 Abstract Input Source
interface and make hive a default impl
Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
index a884465..c19af69 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -18,35 +18,30 @@
package org.apache.kylin.job.hadoop.cube;
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.lookup.HiveTableReader;
import org.apache.kylin.job.constant.BatchConstants;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.List;
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
/**
* @author yangli9
*/
-public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, HCatRecord> {
+public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
- private HCatSchema schema = null;
private CubeJoinedFlatTableDesc intermediateTableDesc;
protected boolean collectStatistics = false;
@@ -55,7 +50,6 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
private Integer[][] allCuboidsBitSet = null;
private HyperLogLogPlusCounter[] allCuboidsHLL = null;
private Long[] cuboidIds;
- private List<String> rowArray;
private HashFunction hf = null;
private int rowCount = 0;
private int SAMPING_PERCENTAGE = 5;
@@ -64,9 +58,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
@Override
protected void setup(Context context) throws IOException {
super.setup(context);
- schema = HCatInputFormat.getTableSchema(context.getConfiguration());
intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
- rowArray = new ArrayList<String>(schema.getFields().size());
collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
if (collectStatistics) {
SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
@@ -116,13 +108,12 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
}
@Override
- public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
- rowArray.clear();
- HiveTableReader.getRowAsList(record, rowArray);
+ public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+ String[] row = flatTableInputFormat.parseMapperInput(record);
try {
for (int i : factDictCols) {
outputKey.set((long) i);
- String fieldValue = rowArray.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+ String fieldValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
if (fieldValue == null)
continue;
byte[] bytes = Bytes.toBytes(fieldValue);
@@ -130,24 +121,25 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
context.write(outputKey, outputValue);
}
} catch (Exception ex) {
- handleErrorRecord(record, ex);
+ handleErrorRecord(row, ex);
}
if (collectStatistics && rowCount < SAMPING_PERCENTAGE) {
- putRowKeyToHLL(rowArray);
+ putRowKeyToHLL(row);
}
if (rowCount++ == 100)
rowCount = 0;
}
- private void putRowKeyToHLL(List<String> row) {
+ private void putRowKeyToHLL(String[] row) {
//generate hash for each row key column
for (int i = 0; i < nRowKey; i++) {
Hasher hc = hf.newHasher();
- if (row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]) != null) {
- row_hashcodes[i].set(hc.putString(row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i])).hash().asBytes());
+ String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+ if (colValue != null) {
+ row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
} else {
row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
deleted file mode 100644
index d3c3249..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.FIFOIterable;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.*;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<ImmutableBytesWritable, Result> {
-
- private Queue<IIRow> buffer = Lists.newLinkedList();
- private Iterator<Slice> slices;
-
- private int[] baseCuboidCol2FlattenTableCol;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.setup(context);
-
- Configuration conf = context.getConfiguration();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
- String iiName = conf.get(BatchConstants.CFG_II_NAME);
- IIInstance ii = IIManager.getInstance(config).getII(iiName);
- IIDesc iiDesc = ii.getDescriptor();
-
- IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(iiDesc);
- TableRecordInfo info = new TableRecordInfo(iiDesc);
- KeyValueCodec codec = new IIKeyValueCodecWithState(info.getDigest());
- slices = codec.decodeKeyValue(new FIFOIterable<IIRow>(buffer)).iterator();
-
- baseCuboidCol2FlattenTableCol = new int[factDictCols.size()];
- for (int i = 0; i < factDictCols.size(); ++i) {
- int index = findTblCol(intermediateTableDesc.getColumnList(), columns.get(factDictCols.get(i)));
- baseCuboidCol2FlattenTableCol[i] = index;
- }
- }
-
- private int findTblCol(List<IntermediateColumnDesc> columns, final TblColRef col) {
- return Iterators.indexOf(columns.iterator(), new Predicate<IntermediateColumnDesc>() {
- @Override
- public boolean apply(IntermediateColumnDesc input) {
- return input.getColRef().equals(col);
- }
- });
- }
-
- @Override
- public void map(ImmutableBytesWritable key, Result cells, Context context) throws IOException, InterruptedException {
- IIRow iiRow = new IIRow();
- for (Cell c : cells.rawCells()) {
- iiRow.updateWith(c);
- }
- buffer.add(iiRow);
-
- if (slices.hasNext()) {
- byte[] vBytesBuffer = null;
- Slice slice = slices.next();
-
- for (RawTableRecord record : slice) {
- for (int i = 0; i < factDictCols.size(); ++i) {
- int baseCuboidIndex = factDictCols.get(i);
- outputKey.set((short) baseCuboidIndex);
- int indexInRecord = baseCuboidCol2FlattenTableCol[i];
-
- Dictionary<?> dictionary = slice.getLocalDictionaries()[indexInRecord];
- if (vBytesBuffer == null || dictionary.getSizeOfValue() > vBytesBuffer.length) {
- vBytesBuffer = new byte[dictionary.getSizeOfValue() * 2];
- }
-
- int vid = record.getValueID(indexInRecord);
- if (vid == dictionary.nullId()) {
- continue;
- }
- int vBytesSize = dictionary.getValueBytesFromId(vid, vBytesBuffer, 0);
-
- outputValue.set(vBytesBuffer, 0, vBytesSize);
- context.write(outputKey, outputValue);
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
index f5ee5b9..20f2474 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
@@ -27,19 +27,14 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
import org.apache.hadoop.io.Text;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.mr.KylinMapper;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -49,8 +44,9 @@ import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.HiveTable;
import org.apache.kylin.dict.lookup.LookupBytesTable;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.measure.MeasureCodec;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -60,6 +56,10 @@ import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.TableSourceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author George Song (ysong1),honma
@@ -170,8 +170,9 @@ public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, T
// load lookup tables
if (!lookupTables.containsKey(lookupTableName)) {
- HiveTable htable = new HiveTable(metadataManager, lookupTableName);
- LookupBytesTable btable = new LookupBytesTable(metadataManager.getTableDesc(lookupTableName), join.getPrimaryKey(), htable);
+ TableDesc tableDesc = metadataManager.getTableDesc(lookupTableName);
+ ReadableTable htable = TableSourceFactory.createReadableTable(tableDesc);
+ LookupBytesTable btable = new LookupBytesTable(tableDesc, join.getPrimaryKey(), htable);
lookupTables.put(lookupTableName, btable);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
index db690b9..f89b143 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
@@ -31,21 +31,21 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * @author shaoshi
*/
-
public class InMemCuboidJob extends AbstractHadoopJob {
protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class);
@@ -62,7 +62,6 @@ public class InMemCuboidJob extends AbstractHadoopJob {
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_NCUBOID_LEVEL);
options.addOption(OPTION_INPUT_FORMAT);
- options.addOption(OPTION_TABLE_NAME);
options.addOption(OPTION_HTABLE_NAME);
options.addOption(OPTION_STATISTICS_OUTPUT);
parseOptions(options, args);
@@ -71,14 +70,12 @@ public class InMemCuboidJob extends AbstractHadoopJob {
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
- String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
String htableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
-
KylinConfig config = KylinConfig.getInstanceFromEnv();
CubeManager cubeMgr = CubeManager.getInstance(config);
CubeInstance cube = cubeMgr.getCube(cubeName);
-
+ CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
logger.info("Starting: " + job.getJobName());
@@ -89,13 +86,9 @@ public class InMemCuboidJob extends AbstractHadoopJob {
DataModelDesc.RealizationCapacity realizationCapacity = cube.getDescriptor().getModel().getCapacity();
job.getConfiguration().set(BatchConstants.CUBE_CAPACITY, realizationCapacity.toString());
- String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
- HCatInputFormat.setInput(job, dbTableNames[0],
- dbTableNames[1]);
-
- job.setInputFormatClass(HCatInputFormat.class);
-
// set Mapper
+ IMRTableInputFormat flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ flatTableInputFormat.configureJob(job);
job.setMapperClass(InMemCuboidMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Text.class);
@@ -112,8 +105,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
attachKylinPropsAndMetadata(cube, job.getConfiguration());
HTable htable = new HTable(conf, htableName);
- HFileOutputFormat.configureIncrementalLoad(job,
- htable);
+ HFileOutputFormat.configureIncrementalLoad(job, htable);
// set Reducer; This need be after configureIncrementalLoad, to overwrite the default reducer class
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index ba87dfe..3eb29df 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -2,6 +2,7 @@ package org.apache.kylin.job.hadoop.cubev2;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
@@ -16,7 +17,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.mr.KylinMapper;
import org.apache.kylin.cube.CubeInstance;
@@ -25,7 +25,8 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.job.inmemcubing.DoggedCubeBuilder;
@@ -36,12 +37,13 @@ import com.google.common.collect.Maps;
/**
*/
-public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, ImmutableBytesWritable, Text> {
+public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ImmutableBytesWritable, Text> {
private static final Log logger = LogFactory.getLog(InMemCuboidMapper.class);
private CubeInstance cube;
private CubeDesc cubeDesc;
private CubeSegment cubeSegment;
+ private IMRTableInputFormat flatTableInputFormat;
private int counter;
private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
@@ -59,6 +61,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
cubeDesc = cube.getDescriptor();
String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
@@ -83,11 +86,13 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
}
@Override
- public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
+ public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
// put each row to the queue
- List<String> row = HiveTableReader.getRowAsList(record);
+ String[] row = flatTableInputFormat.parseMapperInput(record);
+ List<String> rowAsList = Arrays.asList(row);
+
while (!future.isDone()) {
- if (queue.offer(row, 1, TimeUnit.SECONDS)) {
+ if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
counter++;
if (counter % BatchConstants.COUNTER_MAX == 0) {
logger.info("Handled " + counter + " records!");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java
deleted file mode 100644
index c25b164..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.invertedindex.IIDescManager;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.cmd.ICommandOutput;
-import org.apache.kylin.job.cmd.ShellCmd;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-
-/**
- */
-public class IIFlattenHiveJob extends AbstractHadoopJob {
-
- protected static final Logger logger = LoggerFactory.getLogger(IIFlattenHiveJob.class);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
- try {
- options.addOption(OPTION_II_NAME);
- parseOptions(options, args);
-
- String iiname = getOptionValue(OPTION_II_NAME);
- KylinConfig config = KylinConfig.getInstanceFromEnv();
-
- IIInstance iiInstance = IIManager.getInstance(config).getII(iiname);
- IIDesc iidesc = IIDescManager.getInstance(config).getIIDesc(iiInstance.getDescName());
-
- String jobUUID = "00bf87b5-c7b5-4420-a12a-07f6b37b3187";
- JobEngineConfig engineConfig = new JobEngineConfig(config);
- IJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(iidesc);
- String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobUUID);
- String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, //
- JobInstance.getJobWorkingDir(jobUUID, engineConfig.getHdfsWorkingDirectory()), jobUUID);
- String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobUUID, engineConfig);
-
- StringBuffer buf = new StringBuffer();
- buf.append("hive -e \"");
- buf.append(dropTableHql + "\n");
- buf.append(createTableHql + "\n");
- buf.append(insertDataHqls + "\n");
- buf.append("\"");
-
- System.out.println(buf.toString());
- System.out.println("========================");
-
- ShellCmd cmd = new ShellCmd(buf.toString(), null, 0, null, null, false);
- ICommandOutput output = cmd.execute();
- System.out.println(output.getOutput());
- System.out.println(output.getExitCode());
-
- return 0;
- } catch (Exception e) {
- logger.error("error execute IIFlattenHiveJob", e);
- printUsage(options);
- throw e;
- }
- }
-
- public static void main(String[] args) throws Exception {
- IIFlattenHiveJob job = new IIFlattenHiveJob();
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
index 1ae101d..cc68e1b 100644
--- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
@@ -23,7 +23,6 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
import org.apache.kylin.job.common.HadoopShellExecutable;
@@ -38,6 +37,7 @@ import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
import org.apache.kylin.job.hadoop.invertedindex.IIDistinctColumnsJob;
import org.apache.kylin.job.hadoop.invertedindex.InvertedIndexJob;
import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
+import org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide;
import com.google.common.base.Preconditions;
@@ -57,7 +57,7 @@ public final class IIJobBuilder {
IIJob result = initialJob(seg, "BUILD", submitter);
final String jobId = result.getId();
final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
- final String intermediateHiveTableName = intermediateTableDesc.getTableName(jobId);
+ final String intermediateHiveTableName = intermediateTableDesc.getTableName();
final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
final String iiPath = iiRootPath + "*";
@@ -84,7 +84,7 @@ public final class IIJobBuilder {
}
private AbstractExecutable createFlatHiveTableStep(IIJoinedFlatTableDesc intermediateTableDesc, String jobId) {
- return JobBuilderSupport.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId);
+ return BatchCubingInputSide.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId);
}
private IIJob initialJob(IISegment seg, String type, String submitter) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 980b375..5637a09 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -43,7 +43,6 @@ import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
@@ -52,6 +51,7 @@ import org.apache.kylin.job.inmemcubing.ICuboidWriter;
import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable.TableSignature;
import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.streaming.MicroStreamBatch;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/ITableSource.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/ITableSource.java b/job/src/main/java/org/apache/kylin/source/ITableSource.java
deleted file mode 100644
index ae1dccc..0000000
--- a/job/src/main/java/org/apache/kylin/source/ITableSource.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source;
-
-public interface ITableSource {
-
- public <I> I adaptToBuildEngine(Class<I> engineInterface);
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index a0cd62e..4b01f24 100644
--- a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -18,15 +18,157 @@
package org.apache.kylin.source.hive;
-import org.apache.kylin.engine.mr.IMRJobFlowParticipant;
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
public class HiveMRInput implements IMRInput {
@Override
- public IMRJobFlowParticipant createBuildFlowParticipant() {
- // TODO Auto-generated method stub
- return null;
+ public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+ return new BatchCubingInputSide(seg);
+ }
+
+ @Override
+ public IMRTableInputFormat getTableInputFormat(TableDesc table) {
+ return new HiveTableInputFormat(table.getIdentity());
+ }
+
+ public static class HiveTableInputFormat implements IMRTableInputFormat {
+ final String dbName;
+ final String tableName;
+
+ public HiveTableInputFormat(String hiveTable) {
+ String[] parts = HadoopUtil.parseHiveTableName(hiveTable);
+ dbName = parts[0];
+ tableName = parts[1];
+ }
+
+ @Override
+ public void configureJob(Job job) {
+ try {
+ HCatInputFormat.setInput(job, dbName, tableName);
+ job.setInputFormatClass(HCatInputFormat.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String[] parseMapperInput(Object mapperInput) {
+ return HiveTableReader.getRowAsStringArray((HCatRecord) mapperInput);
+ }
+
+ }
+
+ public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
+
+ final JobEngineConfig conf;
+ final CubeSegment seg;
+ final CubeJoinedFlatTableDesc flatHiveTableDesc;
+
+ public BatchCubingInputSide(CubeSegment seg) {
+ this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+ this.seg = seg;
+ this.flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+ }
+
+ @Override
+ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId()));
+ }
+
+ public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) {
+
+ final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+ String insertDataHqls;
+ try {
+ insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to generate insert data SQL for intermediate table.", e);
+ }
+
+ ShellExecutable step = new ShellExecutable();
+ StringBuffer buf = new StringBuffer();
+ buf.append("hive -e \"");
+ buf.append(dropTableHql + "\n");
+ buf.append(createTableHql + "\n");
+ buf.append(insertDataHqls + "\n");
+ buf.append("\"");
+
+ step.setCmd(buf.toString());
+ step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+
+ return step;
+ }
+
+ @Override
+ public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow) {
+ GarbageCollectionStep step = new GarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ step.setOldHiveTable(flatHiveTableDesc.getTableName());
+ jobFlow.addTask(step);
+ }
+
+ @Override
+ public IMRTableInputFormat getFlatTableInputFormat() {
+ return new HiveTableInputFormat(flatHiveTableDesc.getTableName());
+ }
+
+ }
+
+ public static class GarbageCollectionStep extends AbstractExecutable {
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ StringBuffer output = new StringBuffer();
+
+ final String hiveTable = this.getOldHiveTable();
+ if (StringUtils.isNotEmpty(hiveTable)) {
+ final String dropHiveCMD = "hive -e \"DROP TABLE IF EXISTS " + hiveTable + ";\"";
+ ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
+ try {
+ context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
+ output.append("Hive table " + hiveTable + " is dropped. \n");
+ } catch (IOException e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage());
+ return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+ }
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+
+ public void setOldHiveTable(String hiveTable) {
+ setParam("oldHiveTable", hiveTable);
+ }
+
+ private String getOldHiveTable() {
+ return getParam("oldHiveTable");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
new file mode 100644
index 0000000..76b9bab
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.HiveClient;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Management class to sync hive table metadata with command See main method for
+ * how to use the class
+ *
+ * @author jianliu
+ */
+public class HiveSourceTableLoader {
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
+
+ public static final String OUTPUT_SURFIX = "json";
+ public static final String TABLE_FOLDER_NAME = "table";
+ public static final String TABLE_EXD_FOLDER_NAME = "table_exd";
+
+ public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
+
+ Map<String, Set<String>> db2tables = Maps.newHashMap();
+ for (String table : hiveTables) {
+ String[] parts = HadoopUtil.parseHiveTableName(table);
+ Set<String> set = db2tables.get(parts[0]);
+ if (set == null) {
+ set = Sets.newHashSet();
+ db2tables.put(parts[0], set);
+ }
+ set.add(parts[1]);
+ }
+
+ // extract from hive
+ Set<String> loadedTables = Sets.newHashSet();
+ for (String database : db2tables.keySet()) {
+ List<String> loaded = extractHiveTables(database, db2tables.get(database), config);
+ loadedTables.addAll(loaded);
+ }
+
+ return loadedTables;
+ }
+
+ private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException {
+
+ List<String> loadedTables = Lists.newArrayList();
+ MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+ for (String tableName : tables) {
+ Table table = null;
+ HiveClient hiveClient = new HiveClient();
+ List<FieldSchema> partitionFields = null;
+ List<FieldSchema> fields = null;
+ try {
+ table = hiveClient.getHiveTable(database, tableName);
+ partitionFields = table.getPartitionKeys();
+ fields = hiveClient.getHiveTableFields(database, tableName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ if (fields != null && partitionFields != null && partitionFields.size() > 0) {
+ fields.addAll(partitionFields);
+ }
+
+ long tableSize = hiveClient.getFileSizeForTable(table);
+ long tableFileNum = hiveClient.getFileNumberForTable(table);
+ TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
+ if (tableDesc == null) {
+ tableDesc = new TableDesc();
+ tableDesc.setDatabase(database.toUpperCase());
+ tableDesc.setName(tableName.toUpperCase());
+ tableDesc.setUuid(UUID.randomUUID().toString());
+ tableDesc.setLastModified(0);
+ }
+
+ int columnNumber = fields.size();
+ List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
+ for (int i = 0; i < columnNumber; i++) {
+ FieldSchema field = fields.get(i);
+ ColumnDesc cdesc = new ColumnDesc();
+ cdesc.setName(field.getName().toUpperCase());
+ cdesc.setDatatype(field.getType());
+ cdesc.setId(String.valueOf(i + 1));
+ columns.add(cdesc);
+ }
+ tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
+
+ StringBuffer partitionColumnString = new StringBuffer();
+ for (int i = 0, n = partitionFields.size(); i < n; i++) {
+ if (i > 0)
+ partitionColumnString.append(", ");
+ partitionColumnString.append(partitionFields.get(i).getName().toUpperCase());
+ }
+
+ Map<String, String> map = metaMgr.getTableDescExd(tableDesc.getIdentity());
+
+ if (map == null) {
+ map = Maps.newHashMap();
+ }
+ map.put(MetadataConstants.TABLE_EXD_TABLENAME, table.getTableName());
+ map.put(MetadataConstants.TABLE_EXD_LOCATION, table.getSd().getLocation());
+ map.put(MetadataConstants.TABLE_EXD_IF, table.getSd().getInputFormat());
+ map.put(MetadataConstants.TABLE_EXD_OF, table.getSd().getOutputFormat());
+ map.put(MetadataConstants.TABLE_EXD_OWNER, table.getOwner());
+ map.put(MetadataConstants.TABLE_EXD_LAT, String.valueOf(table.getLastAccessTime()));
+ map.put(MetadataConstants.TABLE_EXD_PC, partitionColumnString.toString());
+ map.put(MetadataConstants.TABLE_EXD_TFS, String.valueOf(tableSize));
+ map.put(MetadataConstants.TABLE_EXD_TNF, String.valueOf(tableFileNum));
+ map.put(MetadataConstants.TABLE_EXD_PARTITIONED, Boolean.valueOf(partitionFields != null && partitionFields.size() > 0).toString());
+
+ metaMgr.saveSourceTable(tableDesc);
+ metaMgr.saveTableExd(tableDesc.getIdentity(), map);
+ loadedTables.add(tableDesc.getIdentity());
+ }
+
+
+ return loadedTables;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
new file mode 100644
index 0000000..02e7c45
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HiveClient;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.dict.lookup.FileTable;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class HiveTable implements ReadableTable {
+
+ private static final Logger logger = LoggerFactory.getLogger(HiveTable.class);
+
+ final private String database;
+ final private String hiveTable;
+
+ private HiveClient hiveClient;
+
+ public HiveTable(TableDesc tableDesc) {
+ this.database = tableDesc.getDatabase();
+ this.hiveTable = tableDesc.getName();
+ }
+
+ @Override
+ public TableReader getReader() throws IOException {
+ return new HiveTableReader(database, hiveTable);
+ }
+
+ @Override
+ public TableSignature getSignature() throws IOException {
+ try {
+ String path = computeHDFSLocation();
+ Pair<Long, Long> sizeAndLastModified = FileTable.getSizeAndLastModified(path);
+ long size = sizeAndLastModified.getFirst();
+ long lastModified = sizeAndLastModified.getSecond();
+
+ // for non-native hive table, cannot rely on size & last modified on HDFS
+ if (getHiveClient().isNativeTable(database, hiveTable) == false) {
+ lastModified = System.currentTimeMillis(); // assume table is ever changing
+ }
+
+ return new TableSignature(path, size, lastModified);
+
+ } catch (Exception e) {
+ if (e instanceof IOException)
+ throw (IOException) e;
+ else
+ throw new IOException(e);
+ }
+ }
+
+ private String computeHDFSLocation() throws Exception {
+
+ String override = KylinConfig.getInstanceFromEnv().getOverrideHiveTableLocation(hiveTable);
+ if (override != null) {
+ logger.debug("Override hive table location " + hiveTable + " -- " + override);
+ return override;
+ }
+
+ return getHiveClient().getHiveTableLocation(database, hiveTable);
+ }
+
+ public HiveClient getHiveClient() {
+
+ if (hiveClient == null) {
+ hiveClient = new HiveClient();
+ }
+ return hiveClient;
+ }
+
+ @Override
+ public String toString() {
+ return "hive: database=[" + database + "], table=[" + hiveTable + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
new file mode 100644
index 0000000..35e24fe
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hive.hcatalog.data.transfer.HCatReader;
+import org.apache.hive.hcatalog.data.transfer.ReadEntity;
+import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.apache.kylin.source.ReadableTable.TableReader;
+
+/**
+ * An implementation of TableReader with HCatalog for Hive table.
+ */
+public class HiveTableReader implements TableReader {
+
+ private String dbName;
+ private String tableName;
+ private int currentSplit = -1;
+ private ReaderContext readCntxt = null;
+ private Iterator<HCatRecord> currentHCatRecordItr = null;
+ private HCatRecord currentHCatRecord;
+ private int numberOfSplits = 0;
+ private Map<String, String> partitionKV = null;
+
+ /**
+ * Constructor for reading whole hive table
+ * @param dbName
+ * @param tableName
+ * @throws IOException
+ */
+ public HiveTableReader(String dbName, String tableName) throws IOException {
+ this(dbName, tableName, null);
+ }
+
+ /**
+ * Constructor for reading a partition of the hive table
+ * @param dbName
+ * @param tableName
+ * @param partitionKV key-value pairs condition on the partition
+ * @throws IOException
+ */
+ public HiveTableReader(String dbName, String tableName, Map<String, String> partitionKV) throws IOException {
+ this.dbName = dbName;
+ this.tableName = tableName;
+ this.partitionKV = partitionKV;
+ initialize();
+ }
+
+ private void initialize() throws IOException {
+ try {
+ this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ this.numberOfSplits = readCntxt.numSplits();
+
+// HCatTableInfo tableInfo = HCatTableInfo.
+// HCatSchema schema = HCatBaseInputFormat.getTableSchema(context.getConfiguration);
+ }
+
+ @Override
+ public boolean next() throws IOException {
+
+ while (currentHCatRecordItr == null || !currentHCatRecordItr.hasNext()) {
+ currentSplit++;
+ if (currentSplit == numberOfSplits) {
+ return false;
+ }
+
+ currentHCatRecordItr = loadHCatRecordItr(readCntxt, currentSplit);
+ }
+
+ currentHCatRecord = currentHCatRecordItr.next();
+
+ return true;
+ }
+
+ @Override
+ public String[] getRow() {
+ return getRowAsStringArray(currentHCatRecord);
+ }
+
+ public List<String> getRowAsList() {
+ return getRowAsList(currentHCatRecord);
+ }
+
+ public static List<String> getRowAsList(HCatRecord record, List<String> rowValues) {
+ List<Object> allFields = record.getAll();
+ for (Object o : allFields) {
+ rowValues.add((o == null) ? null : o.toString());
+ }
+ return rowValues;
+ }
+
+ public static List<String> getRowAsList(HCatRecord record) {
+ return Arrays.asList(getRowAsStringArray(record));
+ }
+
+ public static String[] getRowAsStringArray(HCatRecord record) {
+ String[] arr = new String[record.size()];
+ for (int i = 0; i < arr.length; i++) {
+ Object o = record.get(i);
+ arr[i] = (o == null) ? null : o.toString();
+ }
+ return arr;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.readCntxt = null;
+ this.currentHCatRecordItr = null;
+ this.currentHCatRecord = null;
+ this.currentSplit = -1;
+ }
+
+ public String toString() {
+ return "hive table reader for: " + dbName + "." + tableName;
+ }
+
+ private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
+ HiveConf hiveConf = new HiveConf(HiveTableReader.class);
+ Iterator<Entry<String, String>> itr = hiveConf.iterator();
+ Map<String, String> map = new HashMap<String, String>();
+ while (itr.hasNext()) {
+ Entry<String, String> kv = itr.next();
+ map.put(kv.getKey(), kv.getValue());
+ }
+
+ ReadEntity entity;
+ if (partitionKV == null || partitionKV.size() == 0) {
+ entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build();
+ } else {
+ entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build();
+ }
+
+ HCatReader reader = DataTransferFactory.getHCatReader(entity, map);
+ ReaderContext cntxt = reader.prepareRead();
+
+ return cntxt;
+ }
+
+ private static Iterator<HCatRecord> loadHCatRecordItr(ReaderContext readCntxt, int dataSplit) throws HCatException {
+ HCatReader currentHCatReader = DataTransferFactory.getHCatReader(readCntxt, dataSplit);
+
+ return currentHCatReader.read();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
index 4265519..a9e95a4 100644
--- a/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
@@ -19,7 +19,9 @@
package org.apache.kylin.source.hive;
import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.ITableSource;
+import org.apache.kylin.source.ReadableTable;
public class HiveTableSource implements ITableSource {
@@ -33,4 +35,9 @@ public class HiveTableSource implements ITableSource {
}
}
+ @Override
+ public ReadableTable createReadableTable(TableDesc tableDesc) {
+ return new HiveTable(tableDesc);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 9693771..1f91b25 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -34,18 +34,28 @@
package org.apache.kylin.job;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.dict.lookup.HiveTableReader;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
@@ -58,6 +68,7 @@ import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.source.hive.HiveTableReader;
import org.apache.kylin.streaming.StreamBuilder;
import org.apache.kylin.streaming.StreamMessage;
import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
@@ -68,16 +79,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
/**
*/
@@ -141,11 +143,11 @@ public class BuildIIWithStreamTest {
IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
final String uuid = UUID.randomUUID().toString();
- final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, uuid);
- final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, jobEngineConfig.getHdfsWorkingDirectory() + "/kylin-" + uuid, uuid);
+ final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, jobEngineConfig.getHdfsWorkingDirectory() + "/kylin-" + uuid);
String insertDataHqls;
try {
- insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, uuid, jobEngineConfig);
+ insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobEngineConfig);
} catch (IOException e1) {
e1.printStackTrace();
throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
@@ -163,7 +165,7 @@ public class BuildIIWithStreamTest {
logger.info(step.getCmd());
step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
- return intermediateTableDesc.getTableName(uuid);
+ return intermediateTableDesc.getTableName();
}
private void clearSegment(String iiName) throws Exception {
@@ -194,10 +196,7 @@ public class BuildIIWithStreamTest {
final IIDesc desc = iiManager.getII(iiName).getDescriptor();
final String tableName = createIntermediateTable(desc, kylinConfig);
logger.info("intermediate table name:" + tableName);
- final Configuration conf = new Configuration();
- HCatInputFormat.setInput(conf, "default", tableName);
- final HCatSchema tableSchema = HCatInputFormat.getTableSchema(conf);
- logger.info(StringUtils.join(tableSchema.getFieldNames(), "\n"));
+
HiveTableReader reader = new HiveTableReader("default", tableName);
final List<TblColRef> tblColRefs = desc.listAllColumns();
for (TblColRef tblColRef : tblColRefs) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
index 5886324..32e5aff 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
@@ -39,11 +39,11 @@ import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.dict.TrieDictionary;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.source.ReadableTable.TableSignature;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
index 15fe340..21f6a71 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
@@ -63,7 +63,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
@Test
public void testGenCreateTableDDL() {
- String ddl = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, "/tmp", fakeJobUUID);
+ String ddl = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, "/tmp");
System.out.println(ddl);
System.out.println("The length for the ddl is " + ddl.length());
@@ -71,14 +71,14 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
@Test
public void testGenDropTableDDL() {
- String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, fakeJobUUID);
+ String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
System.out.println(ddl);
assertEquals(107, ddl.length());
}
@Test
public void testGenerateInsertSql() throws IOException {
- String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, fakeJobUUID, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+ String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
System.out.println(sqls);
int length = sqls.length();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 9a28ea4..811948b 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -1,21 +1,19 @@
package org.apache.kylin.job.hadoop.invertedindex;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
-import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.types.Pair;
import org.apache.kylin.common.util.FIFOIterable;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
@@ -23,9 +21,11 @@ import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.index.Slice;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.model.*;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.cube.FactDistinctIIColumnsMapper;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.invertedindex.model.KeyValueCodec;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -41,16 +41,20 @@ import org.apache.kylin.storage.hbase.coprocessor.endpoint.ClearTextDictionary;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointAggregators;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.*;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.ParsedStreamMessage;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.StreamParser;
+import org.apache.kylin.streaming.StringStreamParser;
import org.apache.kylin.streaming.invertedindex.SliceBuilder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.*;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
*/
@@ -233,29 +237,4 @@ public class IITest extends LocalFileMetadataTestCase {
}
}
- @Test
- public void factDistinctIIColumnsMapperTest() throws IOException {
- MapDriver<ImmutableBytesWritable, Result, LongWritable, Text> mapDriver;
- FactDistinctIIColumnsMapper mapper = new FactDistinctIIColumnsMapper();
- mapDriver = MapDriver.newMapDriver(mapper);
-
- mapDriver.getConfiguration().set(BatchConstants.CFG_II_NAME, iiName);
- mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- mapDriver.getConfiguration().setStrings("io.serializations", mapDriver.getConfiguration().get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName());
- mapDriver.addAll(Lists.newArrayList(Collections2.transform(iiRows, new Function<IIRow, Pair<ImmutableBytesWritable, Result>>() {
- @Nullable
- @Override
- public Pair<ImmutableBytesWritable, Result> apply(@Nullable IIRow input) {
- return new Pair<ImmutableBytesWritable, Result>(new ImmutableBytesWritable(new byte[] { 1 }), Result.create(input.makeCells()));
- }
- })));
-
- List<Pair<LongWritable, Text>> result = mapDriver.run();
- Set<String> lstgNames = Sets.newHashSet("FP-non GTC", "ABIN");
- for (Pair<LongWritable, Text> pair : result) {
- Assert.assertEquals(pair.getFirst().get(), 6);
- Assert.assertTrue(lstgNames.contains(pair.getSecond().toString()));
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java
deleted file mode 100644
index c9ef366..0000000
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.kylin.job.hadoop.cardinality.HiveColumnCardinalityJob;
-
-/**
- * @author ysong1
- *
- */
-@Ignore("This test is invalid now as the mapper uses HCatalog to fetch the data which need a hive env")
-public class ColumnCardinalityJobTest {
-
- private Configuration conf;
-
- @Before
- public void setup() throws IOException {
- conf = new Configuration();
- conf.set("fs.default.name", "file:///");
- conf.set("mapred.job.tracker", "local");
- }
-
- @Test
- @Ignore("not maintaining")
- public void testJob() throws Exception {
- final String input = "src/test/resources/data/test_cal_dt/";
- final String output = "target/test-output/column-cardinality/";
-
- FileUtil.fullyDelete(new File(output));
-
- String[] args = { "-input", input, "-output", output, "-cols", "1,2,3,4,5,6,9,0" };
- assertEquals("Job failed", 0, ToolRunner.run(new HiveColumnCardinalityJob(), args));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
deleted file mode 100644
index e13289a..0000000
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import static org.junit.Assert.*;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.job.hadoop.cardinality.ColumnCardinalityMapper;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-
-/**
- * @author ysong1
- *
- */
-@Ignore("This test is invalid now as the mapper uses HCatalog to fetch the data which need a hive env")
-public class ColumnCardinalityMapperTest {
-
- @SuppressWarnings("rawtypes")
- MapDriver mapDriver;
- String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Before
- public void setUp() {
- ColumnCardinalityMapper mapper = new ColumnCardinalityMapper();
- mapDriver = MapDriver.newMapDriver(mapper);
- }
-
- public final static String strArr = "abc,tests,test,test,as,sts,test,tss,sets";
-
- @SuppressWarnings({ "unchecked" })
- @Test
- @Ignore
- public void testMapperOn177() throws IOException {
- mapDriver.clearInput();
- File file = new File("src/test/resources/data/test_cal_dt/part-r-00000");
- FileReader reader = new FileReader(file);
- BufferedReader breader = new BufferedReader(reader);
- String s = breader.readLine();
- int i = 0;
- while (s != null) {
- LongWritable inputKey = new LongWritable(i++);
- mapDriver.addInput(inputKey, new Text(s));
- s = breader.readLine();
- }
- // breader.close();
- List<Pair<IntWritable, BytesWritable>> result = mapDriver.run();
- breader.close();
- assertEquals(9, result.size());
-
- int key1 = result.get(0).getFirst().get();
- BytesWritable value1 = result.get(0).getSecond();
- byte[] bytes = value1.getBytes();
- HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
- hllc.readRegisters(ByteBuffer.wrap(bytes));
- assertTrue(key1 > 0);
- assertEquals(8, hllc.getCountEstimate());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testMapperOnComma() throws IOException {
- mapDriver.clearInput();
- LongWritable inputKey1 = new LongWritable(1);
- LongWritable inputKey2 = new LongWritable(2);
- LongWritable inputKey3 = new LongWritable(3);
- LongWritable inputKey4 = new LongWritable(4);
- LongWritable inputKey5 = new LongWritable(5);
- LongWritable inputKey6 = new LongWritable(6);
- LongWritable inputKey7 = new LongWritable(7);
-
- mapDriver.addInput(inputKey1, new Text());
- mapDriver.addInput(inputKey2, new Text(strArr));
- mapDriver.addInput(inputKey3, new Text(strArr));
- mapDriver.addInput(inputKey4, new Text(strArr));
- mapDriver.addInput(inputKey5, new Text(strArr));
- mapDriver.addInput(inputKey6, new Text(strArr));
- mapDriver.addInput(inputKey7, new Text(strArr));
-
- List<Pair<IntWritable, BytesWritable>> result = mapDriver.run();
-
- assertEquals(9, result.size());
-
- int key1 = result.get(0).getFirst().get();
- BytesWritable value1 = result.get(0).getSecond();
- byte[] bytes = value1.getBytes();
- HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
- hllc.readRegisters(ByteBuffer.wrap(bytes));
- System.out.println("ab\177ab".length());
- assertTrue(key1 > 0);
- assertEquals(1, hllc.getCountEstimate());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
index 2c01fb6..867ee82 100644
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
@@ -46,6 +46,8 @@ import org.apache.kylin.cube.kv.RowConstants;
*
*/
public class ColumnCardinalityReducerTest {
+
+ public final static String strArr = "abc,tests,test,test,as,sts,test,tss,sets";
ReduceDriver<IntWritable, BytesWritable, IntWritable, LongWritable> reduceDriver;
String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
@@ -76,23 +78,23 @@ public class ColumnCardinalityReducerTest {
public void testReducer() throws IOException {
IntWritable key1 = new IntWritable(1);
List<BytesWritable> values1 = new ArrayList<BytesWritable>();
- values1.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr)));
+ values1.add(new BytesWritable(getBytes(strArr)));
IntWritable key2 = new IntWritable(2);
List<BytesWritable> values2 = new ArrayList<BytesWritable>();
- values2.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " x")));
+ values2.add(new BytesWritable(getBytes(strArr + " x")));
IntWritable key3 = new IntWritable(3);
List<BytesWritable> values3 = new ArrayList<BytesWritable>();
- values3.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " xx")));
+ values3.add(new BytesWritable(getBytes(strArr + " xx")));
IntWritable key4 = new IntWritable(4);
List<BytesWritable> values4 = new ArrayList<BytesWritable>();
- values4.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " xxx")));
+ values4.add(new BytesWritable(getBytes(strArr + " xxx")));
IntWritable key5 = new IntWritable(5);
List<BytesWritable> values5 = new ArrayList<BytesWritable>();
- values5.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " xxxx")));
+ values5.add(new BytesWritable(getBytes(strArr + " xxxx")));
reduceDriver.withInput(key1, values1);
reduceDriver.withInput(key2, values2);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
new file mode 100644
index 0000000..de6df2c
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.source.hive.HiveSourceTableLoader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
+
+ @Before
+ public void setup() throws Exception {
+ super.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ super.cleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws IOException {
+ if (!useSandbox())
+ return;
+
+ KylinConfig config = getTestConfig();
+ String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
+ Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
+
+ assertTrue(loaded.size() == toLoad.length);
+ for (String str : toLoad)
+ assertTrue(loaded.contains(str));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
new file mode 100644
index 0000000..624f158
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.source.hive.HiveTableReader;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * This test case need the hive runtime; Please run it with sandbox;
+ * @author shaoshi
+ *
+ * It is in the exclude list of default profile in pom.xml
+ */
+public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
+
+
+ @Test
+ public void test() throws IOException {
+ HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact");
+ int rowNumber = 0;
+ while (reader.next()) {
+ String[] row = reader.getRow();
+ Assert.assertEquals(9, row.length);
+ //System.out.println(ArrayUtils.toString(row));
+ rowNumber++;
+ }
+
+ reader.close();
+ Assert.assertEquals(10000, rowNumber);
+ }
+}