You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 02:33:26 UTC
[03/19] incubator-kylin git commit: KYLIN-1112 on the way: Reorganize
InvertedIndex source codes into plug-in architecture
KYLIN-1112 on the way: Reorganize InvertedIndex source codes into plug-in architecture
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d921f3ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d921f3ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d921f3ca
Branch: refs/heads/KYLIN-1112-2
Commit: d921f3ca55a20e095f619a2fc1c585543d05c0c1
Parents: b74a83b
Author: shaofengshi <sh...@apache.org>
Authored: Thu Oct 29 16:52:17 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 5 09:24:16 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/job/BuildIIWithEngineTest.java | 4 +-
.../apache/kylin/job/BuildIIWithStreamTest.java | 9 +-
.../kylin/job/hadoop/invertedindex/IICLI.java | 106 +++++++++
.../kylin/job/hadoop/invertedindex/IITest.java | 2 +-
engine-mr/pom.xml | 21 +-
.../CreateInvertedIndexDictionaryJob.java | 70 ++++++
.../engine/mr/invertedindex/IIBulkLoadJob.java | 74 ++++++
.../mr/invertedindex/IICreateHFileJob.java | 81 +++++++
.../mr/invertedindex/IICreateHFileMapper.java | 55 +++++
.../mr/invertedindex/IICreateHTableJob.java | 148 ++++++++++++
.../invertedindex/IIDeployCoprocessorCLI.java | 157 +++++++++++++
.../IIDistinctColumnsCombiner.java | 58 +++++
.../mr/invertedindex/IIDistinctColumnsJob.java | 136 +++++++++++
.../invertedindex/IIDistinctColumnsMapper.java | 66 ++++++
.../invertedindex/IIDistinctColumnsReducer.java | 77 +++++++
.../kylin/engine/mr/invertedindex/IIJob.java | 50 ++++
.../engine/mr/invertedindex/IIJobBuilder.java | 224 ++++++++++++++++++
.../mr/invertedindex/InvertedIndexJob.java | 164 +++++++++++++
.../mr/invertedindex/InvertedIndexMapper.java | 90 ++++++++
.../invertedindex/InvertedIndexPartitioner.java | 73 ++++++
.../mr/invertedindex/InvertedIndexReducer.java | 100 ++++++++
engine-streaming/pom.xml | 5 +
.../streaming/invertedindex/SliceBuilder.java | 81 +++++++
invertedindex/pom.xml | 19 +-
.../invertedindex/streaming/SliceBuilder.java | 81 -------
.../dict/CreateInvertedIndexDictionaryJob.java | 70 ------
.../job/hadoop/invertedindex/IIBulkLoadJob.java | 74 ------
.../hadoop/invertedindex/IICreateHFileJob.java | 81 -------
.../invertedindex/IICreateHFileMapper.java | 55 -----
.../hadoop/invertedindex/IICreateHTableJob.java | 148 ------------
.../invertedindex/IIDeployCoprocessorCLI.java | 157 -------------
.../IIDistinctColumnsCombiner.java | 58 -----
.../invertedindex/IIDistinctColumnsJob.java | 136 -----------
.../invertedindex/IIDistinctColumnsMapper.java | 66 ------
.../invertedindex/IIDistinctColumnsReducer.java | 77 -------
.../hadoop/invertedindex/InvertedIndexJob.java | 164 -------------
.../invertedindex/InvertedIndexMapper.java | 90 --------
.../invertedindex/InvertedIndexPartitioner.java | 73 ------
.../invertedindex/InvertedIndexReducer.java | 100 --------
.../apache/kylin/job/invertedindex/IIJob.java | 50 ----
.../kylin/job/invertedindex/IIJobBuilder.java | 230 -------------------
.../java/org/apache/kylin/job/tools/IICLI.java | 106 ---------
42 files changed, 1843 insertions(+), 1843 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
index a3b3db2..47415a8 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
@@ -44,8 +44,8 @@ import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.invertedindex.IIJob;
-import org.apache.kylin.job.invertedindex.IIJobBuilder;
+import org.apache.kylin.engine.mr.invertedindex.IIJob;
+import org.apache.kylin.engine.mr.invertedindex.IIJobBuilder;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 61cf262..4ccc6b4 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -40,12 +40,7 @@ import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -66,11 +61,11 @@ import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.invertedindex.streaming.SliceBuilder;
+import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
+import org.apache.kylin.engine.mr.invertedindex.IICreateHTableJob;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.source.hive.HiveTableReader;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IICLI.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IICLI.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IICLI.java
new file mode 100644
index 0000000..7e7be34
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IICLI.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+
+/**
+ * @author yangli9
+ */
+public class IICLI {
+
+ public static void main(String[] args) throws IOException {
+ Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ String iiName = args[0];
+ IIInstance ii = mgr.getII(iiName);
+
+ String path = args[1];
+ System.out.println("Reading from " + path + " ...");
+
+ TableRecordInfo info = new TableRecordInfo(ii.getFirstSegment());
+ IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+ int count = 0;
+ for (Slice slice : codec.decodeKeyValue(readSequenceKVs(hconf, path))) {
+ for (RawTableRecord rec : slice) {
+ System.out.printf(new TableRecord(rec, info).toString());
+ count++;
+ }
+ }
+ System.out.println("Total " + count + " records");
+ }
+
+ public static Iterable<IIRow> readSequenceKVs(Configuration hconf, String path) throws IOException {
+ final Reader reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
+ return new Iterable<IIRow>() {
+ @Override
+ public Iterator<IIRow> iterator() {
+ return new Iterator<IIRow>() {
+ ImmutableBytesWritable k = new ImmutableBytesWritable();
+ ImmutableBytesWritable v = new ImmutableBytesWritable();
+ IIRow pair = new IIRow(k, v, null);
+
+ @Override
+ public boolean hasNext() {
+ boolean hasNext = false;
+ try {
+ hasNext = reader.next(k, v);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (hasNext == false) {
+ IOUtils.closeQuietly(reader);
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public IIRow next() {
+ return pair;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index e113c06..200156a 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -35,7 +35,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
import org.apache.kylin.invertedindex.model.IIRow;
import org.apache.kylin.invertedindex.model.KeyValueCodec;
-import org.apache.kylin.invertedindex.streaming.SliceBuilder;
+import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index 7a2bfe5..4500555 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -34,7 +34,14 @@
</properties>
<dependencies>
-
+
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-cube</artifactId>
@@ -42,23 +49,21 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-job</artifactId>
+ <artifactId>kylin-invertedindex</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-storage</artifactId>
+ <artifactId>kylin-core-job</artifactId>
<version>${project.parent.version}</version>
</dependency>
-
- <!-- Env & Test -->
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-common</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
+ <artifactId>kylin-core-storage</artifactId>
<version>${project.parent.version}</version>
</dependency>
+
+ <!-- Env & Test -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
new file mode 100644
index 0000000..39d74b4
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
+import org.apache.kylin.engine.mr.DFSFileTable;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ */
+public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_II_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ parseOptions(options, args);
+
+ final String iiname = getOptionValue(OPTION_II_NAME);
+ final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
+ final KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+ IIManager mgr = IIManager.getInstance(config);
+ IIInstance ii = mgr.getII(iiname);
+
+ mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), new DistinctColumnValuesProvider() {
+ @Override
+ public ReadableTable getDistinctValuesFor(TblColRef col) {
+ return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
+ }
+ });
+ return 0;
+ } catch (Exception e) {
+ printUsage(options);
+ throw e;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIBulkLoadJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIBulkLoadJob.java
new file mode 100644
index 0000000..a0a5ca6
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIBulkLoadJob.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ */
+public class IIBulkLoadJob extends AbstractHadoopJob {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_HTABLE_NAME);
+ options.addOption(OPTION_II_NAME);
+ parseOptions(options, args);
+
+ String tableName = getOptionValue(OPTION_HTABLE_NAME);
+ String input = getOptionValue(OPTION_INPUT_PATH);
+ String iiname = getOptionValue(OPTION_II_NAME);
+
+ FileSystem fs = FileSystem.get(getConf());
+ FsPermission permission = new FsPermission((short) 0777);
+ fs.setPermission(new Path(input, IIDesc.HBASE_FAMILY), permission);
+
+ int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName });
+
+ IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+ IIInstance ii = mgr.getII(iiname);
+ IISegment seg = ii.getFirstSegment();
+ seg.setStorageLocationIdentifier(tableName);
+ seg.setStatus(SegmentStatusEnum.READY);
+ mgr.updateII(ii);
+
+ return hbaseExitCode;
+
+ } catch (Exception e) {
+ printUsage(options);
+ throw e;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileJob.java
new file mode 100644
index 0000000..4ab3051
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileJob.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ *
+ */
+public class IICreateHFileJob extends AbstractHadoopJob {
+
+ protected static final Logger logger = LoggerFactory.getLogger(IICreateHFileJob.class);
+
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_II_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_HTABLE_NAME);
+ parseOptions(options, args);
+
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+
+ setJobClasspath(job);
+
+ addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(IICreateHFileMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+
+ String tableName = getOptionValue(OPTION_HTABLE_NAME);
+ HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName);
+ HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+ this.deletePath(job.getConfiguration(), output);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ printUsage(options);
+ throw e;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileMapper.java
new file mode 100644
index 0000000..fdcc138
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHFileMapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.invertedindex.model.IIDesc;
+
+/**
+ * @author yangli9
+ */
+public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
+
+ long timestamp;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ timestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
+
+ KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
+ IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
+ IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, //
+ timestamp, Type.Put, //
+ value.get(), value.getOffset(), value.getLength());
+
+ context.write(key, kv);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHTableJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHTableJob.java
new file mode 100644
index 0000000..3ccd701
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IICreateHTableJob.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class IICreateHTableJob extends AbstractHadoopJob {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_II_NAME);
+ options.addOption(OPTION_HTABLE_NAME);
+ parseOptions(options, args);
+
+ String tableName = getOptionValue(OPTION_HTABLE_NAME);
+ String iiName = getOptionValue(OPTION_II_NAME);
+
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ IIManager iiManager = IIManager.getInstance(config);
+ IIInstance ii = iiManager.getII(iiName);
+ int sharding = ii.getDescriptor().getSharding();
+
+ HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+ HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
+ cf.setMaxVersions(1);
+
+ String hbaseDefaultCC = config.getHbaseDefaultCompressionCodec().toLowerCase();
+
+ switch (hbaseDefaultCC) {
+ case "snappy": {
+ logger.info("hbase will use snappy to compress data");
+ cf.setCompressionType(Compression.Algorithm.SNAPPY);
+ break;
+ }
+ case "lzo": {
+ logger.info("hbase will use lzo to compress data");
+ cf.setCompressionType(Compression.Algorithm.LZO);
+ break;
+ }
+ case "gz":
+ case "gzip": {
+ logger.info("hbase will use gzip to compress data");
+ cf.setCompressionType(Compression.Algorithm.GZ);
+ break;
+ }
+ case "lz4": {
+ logger.info("hbase will use lz4 to compress data");
+ cf.setCompressionType(Compression.Algorithm.LZ4);
+ break;
+ }
+ default: {
+ logger.info("hbase will not user any compression codec to compress data");
+ }
+ }
+
+ cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+ tableDesc.addFamily(cf);
+ tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
+ tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
+ tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+
+ Configuration conf = HBaseConfiguration.create(getConf());
+ if (User.isHBaseSecurityEnabled(conf)) {
+ // add coprocessor for bulk load
+ tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+ }
+
+ IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
+
+ // drop the table first
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ if (admin.tableExists(tableName)) {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+
+ // create table
+ byte[][] splitKeys = getSplits(sharding);
+ if (splitKeys.length == 0)
+ splitKeys = null;
+ admin.createTable(tableDesc, splitKeys);
+ if (splitKeys != null) {
+ for (int i = 0; i < splitKeys.length; i++) {
+ System.out.println("split key " + i + ": " + BytesUtil.toHex(splitKeys[i]));
+ }
+ }
+ System.out.println("create hbase table " + tableName + " done.");
+ admin.close();
+
+ return 0;
+ } catch (Exception e) {
+ printUsage(options);
+ throw e;
+ }
+ }
+
+ //one region for one shard
+ private byte[][] getSplits(int shard) {
+ byte[][] result = new byte[shard - 1][];
+ for (int i = 1; i < shard; ++i) {
+ byte[] split = new byte[IIKeyValueCodec.SHARD_LEN];
+ BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN);
+ result[i - 1] = split;
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDeployCoprocessorCLI.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDeployCoprocessorCLI.java
new file mode 100644
index 0000000..da4b95b
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDeployCoprocessorCLI.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * THIS IS A TAILORED DUPLICATE OF org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI TO AVOID CYCLIC
+ * DEPENDENCY. INVERTED-INDEX CODE NOW SPLITTED ACROSS kylin-invertedindex AND kylin-storage-hbase.
+ * DEFENITELY NEED FURTHER REFACTOR.
+ */
+public class IIDeployCoprocessorCLI {
+
+ private static final Logger logger = LoggerFactory.getLogger(IIDeployCoprocessorCLI.class);
+
+ public static final String CubeObserverClass = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
+ public static final String CubeEndpointClass = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
+ public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
+
+ public static void deployCoprocessor(HTableDescriptor tableDesc) {
+ try {
+ initHTableCoprocessor(tableDesc);
+ logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
+
+ } catch (Exception ex) {
+ logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
+ logger.error("Will try creating the table without coprocessor.");
+ }
+ }
+
+ private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ FileSystem fileSystem = FileSystem.get(hconf);
+
+ String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+ Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
+
+ addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+ }
+
+ private static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
+ logger.info("Add coprocessor on " + desc.getNameAsString());
+ desc.addCoprocessor(IIEndpointClass, hdfsCoprocessorJar, 1000, null);
+ desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
+ desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null);
+ }
+
+ private static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
+ Path uploadPath = null;
+ File localCoprocessorFile = new File(localCoprocessorJar);
+
+ // check existing jars
+ if (oldJarPaths == null) {
+ oldJarPaths = new HashSet<String>();
+ }
+ Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
+ for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+ if (isSame(localCoprocessorFile, fileStatus)) {
+ uploadPath = fileStatus.getPath();
+ break;
+ }
+ String filename = fileStatus.getPath().toString();
+ if (filename.endsWith(".jar")) {
+ oldJarPaths.add(filename);
+ }
+ }
+
+ // upload if not existing
+ if (uploadPath == null) {
+ // figure out a unique new jar file name
+ Set<String> oldJarNames = new HashSet<String>();
+ for (String path : oldJarPaths) {
+ oldJarNames.add(new Path(path).getName());
+ }
+ String baseName = getBaseFileName(localCoprocessorJar);
+ String newName = null;
+ int i = 0;
+ while (newName == null) {
+ newName = baseName + "-" + (i++) + ".jar";
+ if (oldJarNames.contains(newName))
+ newName = null;
+ }
+
+ // upload
+ uploadPath = new Path(coprocessorDir, newName);
+ FileInputStream in = null;
+ FSDataOutputStream out = null;
+ try {
+ in = new FileInputStream(localCoprocessorFile);
+ out = fileSystem.create(uploadPath);
+ IOUtils.copy(in, out);
+ } finally {
+ IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(out);
+ }
+
+ fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
+
+ }
+
+ uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
+ return uploadPath;
+ }
+
+ private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
+ return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
+ }
+
+ private static String getBaseFileName(String localCoprocessorJar) {
+ File localJar = new File(localCoprocessorJar);
+ String baseName = localJar.getName();
+ if (baseName.endsWith(".jar"))
+ baseName = baseName.substring(0, baseName.length() - ".jar".length());
+ return baseName;
+ }
+
+ private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
+ String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
+ Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
+ fileSystem.mkdirs(coprocessorDir);
+ return coprocessorDir;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
new file mode 100644
index 0000000..651ad63
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinReducer;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> {
+
+ private Text outputValue = new Text();
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ }
+
+ @Override
+ public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+ HashSet<ByteArray> set = new HashSet<ByteArray>();
+ for (Text textValue : values) {
+ ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+ set.add(value);
+ }
+
+ for (ByteArray value : set) {
+ outputValue.set(value.array(), value.offset(), value.length());
+ context.write(key, outputValue);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
new file mode 100644
index 0000000..c9f5375
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsJob extends AbstractHadoopJob {
+ protected static final Logger logger = LoggerFactory.getLogger(IIDistinctColumnsJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_TABLE_NAME);
+ options.addOption(OPTION_II_NAME);
+ options.addOption(OPTION_OUTPUT_PATH);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase();
+ String iiName = getOptionValue(OPTION_II_NAME);
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ // ----------------------------------------------------------------------------
+
+ logger.info("Starting: " + job.getJobName() + " on table " + tableName);
+
+ IIManager iiMgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+ IIInstance ii = iiMgr.getII(iiName);
+ job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
+ job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii));
+
+ setJobClasspath(job);
+
+ setupMapper();
+ setupReducer(output);
+
+ return waitForCompletion(job);
+
+ } catch (Exception e) {
+ printUsage(options);
+ throw e;
+ }
+
+ }
+
+ private String getColumns(IIInstance ii) {
+ IIJoinedFlatTableDesc iiflat = new IIJoinedFlatTableDesc(ii.getDescriptor());
+ StringBuilder buf = new StringBuilder();
+ for (IntermediateColumnDesc col : iiflat.getColumnList()) {
+ if (buf.length() > 0)
+ buf.append(",");
+ buf.append(col.getColumnName());
+ }
+ return buf.toString();
+ }
+
+ private void setupMapper() throws IOException {
+
+ String tableName = job.getConfiguration().get(BatchConstants.TABLE_NAME);
+ String[] dbTableNames = HadoopUtil.parseHiveTableName(tableName);
+
+ logger.info("setting hcat input format, db name {} , table name {}", dbTableNames[0], dbTableNames[1]);
+
+ HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
+
+ job.setInputFormatClass(HCatInputFormat.class);
+
+ job.setMapperClass(IIDistinctColumnsMapper.class);
+ job.setCombinerClass(IIDistinctColumnsCombiner.class);
+ job.setMapOutputKeyClass(ShortWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ }
+
+ private void setupReducer(Path output) throws IOException {
+ job.setReducerClass(IIDistinctColumnsReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Text.class);
+
+ FileOutputFormat.setOutputPath(job, output);
+ job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+ job.setNumReduceTasks(1);
+
+ deletePath(job.getConfiguration(), output);
+ }
+
+ public static void main(String[] args) throws Exception {
+ IIDistinctColumnsJob job = new IIDistinctColumnsJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
new file mode 100644
index 0000000..0f0c731
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinMapper;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, ShortWritable, Text> {
+
+ private ShortWritable outputKey = new ShortWritable();
+ private Text outputValue = new Text();
+ private HCatSchema schema = null;
+ private int columnSize = 0;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ schema = HCatInputFormat.getTableSchema(context.getConfiguration());
+ columnSize = schema.getFields().size();
+ }
+
+ @Override
+ public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
+
+ HCatFieldSchema fieldSchema = null;
+ for (short i = 0; i < columnSize; i++) {
+ outputKey.set(i);
+ fieldSchema = schema.get(i);
+ Object fieldValue = record.get(fieldSchema.getName(), schema);
+ if (fieldValue == null)
+ continue;
+ byte[] bytes = Bytes.toBytes(fieldValue.toString());
+ outputValue.set(bytes, 0, bytes.length);
+ context.write(outputKey, outputValue);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
new file mode 100644
index 0000000..d50385f
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
+
+ private String[] columns;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ Configuration conf = context.getConfiguration();
+ this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
+ }
+
+ @Override
+ public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ String columnName = columns[key.get()];
+
+ HashSet<ByteArray> set = new HashSet<ByteArray>();
+ for (Text textValue : values) {
+ ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+ set.add(value);
+ }
+
+ Configuration conf = context.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+ FSDataOutputStream out = fs.create(new Path(outputPath, columnName));
+
+ try {
+ for (ByteArray value : set) {
+ out.write(value.array(), value.offset(), value.length());
+ out.write('\n');
+ }
+ } finally {
+ out.close();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java
new file mode 100644
index 0000000..30e653b
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+/**
+ */
+public class IIJob extends DefaultChainedExecutable {
+
+ public IIJob() {
+ super();
+ }
+
+ private static final String II_INSTANCE_NAME = "iiName";
+ private static final String SEGMENT_ID = "segmentId";
+
+ void setIIName(String name) {
+ setParam(II_INSTANCE_NAME, name);
+ }
+
+ public String getIIName() {
+ return getParam(II_INSTANCE_NAME);
+ }
+
+ void setSegmentId(String segmentId) {
+ setParam(SEGMENT_ID, segmentId);
+ }
+
+ public String getSegmentId() {
+ return getParam(SEGMENT_ID);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
new file mode 100644
index 0000000..2605e65
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
+import org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide;
+
+import com.google.common.base.Preconditions;
+
+/**
+ */
+public final class IIJobBuilder {
+
+ final JobEngineConfig engineConfig;
+
+ public IIJobBuilder(JobEngineConfig engineConfig) {
+ this.engineConfig = engineConfig;
+ }
+
+ public IIJob buildJob(IISegment seg, String submitter) {
+ checkPreconditions(seg);
+
+ IIJob result = initialJob(seg, "BUILD", submitter);
+ final String jobId = result.getId();
+ final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
+ final String intermediateTableIdentity = getIntermediateTableIdentity(intermediateTableDesc);
+ final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
+ final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
+ final String iiPath = iiRootPath + "*";
+
+ final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId);
+ result.addTask(intermediateHiveTableStep);
+
+ result.addTask(createFactDistinctColumnsStep(seg, intermediateTableIdentity, jobId, factDistinctColumnsPath));
+
+ result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
+
+ result.addTask(createInvertedIndexStep(seg, intermediateTableIdentity, iiRootPath));
+
+ // create htable step
+ result.addTask(createCreateHTableStep(seg));
+
+ // generate hfiles step
+ result.addTask(createConvertToHfileStep(seg, iiPath, jobId));
+
+ // bulk load step
+ result.addTask(createBulkLoadStep(seg, jobId));
+
+ return result;
+ }
+
+ private AbstractExecutable createFlatHiveTableStep(IIJoinedFlatTableDesc intermediateTableDesc, String jobId) {
+ return BatchCubingInputSide.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId);
+ }
+
+ private IIJob initialJob(IISegment seg, String type, String submitter) {
+ IIJob result = new IIJob();
+ SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+ format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
+ result.setIIName(seg.getIIInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
+ result.setSubmitter(submitter);
+ return result;
+ }
+
+ private void checkPreconditions(IISegment seg) {
+ Preconditions.checkNotNull(seg, "segment cannot be null");
+ Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
+ }
+
+ private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) {
+ try {
+ String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM);
+ if (jobConf != null && jobConf.length() > 0) {
+ builder.append(" -conf ").append(jobConf);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) {
+ return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns";
+ }
+
+ private String getHFilePath(IISegment seg, String jobId) {
+ return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/";
+ }
+
+ private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) {
+ MapReduceExecutable result = new MapReduceExecutable();
+ result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+ result.setMapReduceJobClass(IIDistinctColumnsJob.class);
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, engineConfig);
+ appendExecCmdParameters(cmd, "tablename", factTableName);
+ appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+ appendExecCmdParameters(cmd, "output", output);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step");
+
+ result.setMapReduceParams(cmd.toString());
+ return result;
+ }
+
+ private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) {
+ // base cuboid job
+ HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
+ buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
+ StringBuilder cmd = new StringBuilder();
+ appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+ appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
+
+ buildDictionaryStep.setJobParams(cmd.toString());
+ buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
+ return buildDictionaryStep;
+ }
+
+ private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
+ // base cuboid job
+ MapReduceExecutable buildIIStep = new MapReduceExecutable();
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, engineConfig);
+
+ buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
+
+ appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+ appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
+ appendExecCmdParameters(cmd, "output", iiOutputTempPath);
+ appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
+
+ buildIIStep.setMapReduceParams(cmd.toString());
+ buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
+ return buildIIStep;
+ }
+
+ private HadoopShellExecutable createCreateHTableStep(IISegment seg) {
+ HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
+ createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+ StringBuilder cmd = new StringBuilder();
+ appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+
+ createHtableStep.setJobParams(cmd.toString());
+ createHtableStep.setJobClass(IICreateHTableJob.class);
+
+ return createHtableStep;
+ }
+
+ private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) {
+ MapReduceExecutable createHFilesStep = new MapReduceExecutable();
+ createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, engineConfig);
+ appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+ appendExecCmdParameters(cmd, "input", inputPath);
+ appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step");
+
+ createHFilesStep.setMapReduceParams(cmd.toString());
+ createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
+
+ return createHFilesStep;
+ }
+
+ private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) {
+ HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
+ bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
+
+ StringBuilder cmd = new StringBuilder();
+ appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+
+ bulkLoadStep.setJobParams(cmd.toString());
+ bulkLoadStep.setJobClass(IIBulkLoadJob.class);
+
+ return bulkLoadStep;
+
+ }
+
+ private StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
+ return buf.append(" -").append(paraName).append(" ").append(paraValue);
+ }
+
+ private String getJobWorkingDir(String uuid) {
+ return engineConfig.getHdfsWorkingDirectory() + "kylin-" + uuid;
+ }
+
+ private String getIntermediateTableIdentity(IIJoinedFlatTableDesc intermediateTableDesc) {
+ return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
new file mode 100644
index 0000000..fc5f939
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexJob extends AbstractHadoopJob {
+ protected static final Logger logger = LoggerFactory.getLogger(InvertedIndexJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_II_NAME);
+ options.addOption(OPTION_TABLE_NAME);
+ options.addOption(OPTION_OUTPUT_PATH);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String iiname = getOptionValue(OPTION_II_NAME);
+ String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ // ----------------------------------------------------------------------------
+
+ System.out.println("Starting: " + job.getJobName());
+
+ IIInstance ii = getII(iiname);
+ short sharding = ii.getDescriptor().getSharding();
+
+ setJobClasspath(job);
+
+ setupMapper(intermediateTable);
+ setupReducer(output, sharding);
+ attachMetadata(ii);
+
+ return waitForCompletion(job);
+
+ } catch (Exception e) {
+ printUsage(options);
+ throw e;
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+
+ }
+
+ private IIInstance getII(String iiName) {
+ IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+ IIInstance ii = mgr.getII(iiName);
+ if (ii == null)
+ throw new IllegalArgumentException("No Inverted Index found by name " + iiName);
+ return ii;
+ }
+
+ private void attachMetadata(IIInstance ii) throws IOException {
+
+ Configuration conf = job.getConfiguration();
+ attachKylinPropsAndMetadata(ii, conf);
+
+ IISegment seg = ii.getFirstSegment();
+ conf.set(BatchConstants.CFG_II_NAME, ii.getName());
+ conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName());
+ }
+
+ protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
+ MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ // write II / model_desc / II_desc / dict / table
+ ArrayList<String> dumpList = new ArrayList<String>();
+ dumpList.add(ii.getResourcePath());
+ dumpList.add(ii.getDescriptor().getModel().getResourcePath());
+ dumpList.add(ii.getDescriptor().getResourcePath());
+
+ for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
+ TableDesc table = metaMgr.getTableDesc(tableName);
+ dumpList.add(table.getResourcePath());
+ }
+ for (IISegment segment : ii.getSegments()) {
+ dumpList.addAll(segment.getDictionaryPaths());
+ }
+
+ attachKylinPropsAndMetadata(dumpList, conf);
+ }
+
+ private void setupMapper(String intermediateTable) throws IOException {
+
+ String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
+ HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
+
+ job.setInputFormatClass(HCatInputFormat.class);
+
+ job.setMapperClass(InvertedIndexMapper.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(ImmutableBytesWritable.class);
+ job.setPartitionerClass(InvertedIndexPartitioner.class);
+ }
+
+ private void setupReducer(Path output, short sharding) throws IOException {
+ job.setReducerClass(InvertedIndexReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(ImmutableBytesWritable.class);
+
+ job.setNumReduceTasks(sharding);
+
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+ deletePath(job.getConfiguration(), output);
+ }
+
+ public static void main(String[] args) throws Exception {
+ InvertedIndexJob job = new InvertedIndexJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
new file mode 100644
index 0000000..81921f1
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, LongWritable, ImmutableBytesWritable> {
+
+ private TableRecordInfo info;
+ private TableRecord rec;
+
+ private LongWritable outputKey;
+ private ImmutableBytesWritable outputValue;
+ private HCatSchema schema = null;
+ private List<HCatFieldSchema> fields;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ Configuration conf = context.getConfiguration();
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ IIManager mgr = IIManager.getInstance(config);
+ IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
+ IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+ this.info = new TableRecordInfo(seg);
+ this.rec = this.info.createTableRecord();
+
+ outputKey = new LongWritable();
+ outputValue = new ImmutableBytesWritable(rec.getBytes());
+
+ schema = HCatInputFormat.getTableSchema(context.getConfiguration());
+
+ fields = schema.getFields();
+ }
+
+ @Override
+ public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
+
+ rec.reset();
+ for (int i = 0; i < fields.size(); i++) {
+ Object fieldValue = record.get(i);
+ rec.setValueString(i, fieldValue == null ? null : fieldValue.toString());
+ }
+
+ outputKey.set(rec.getTimestamp());
+ // outputValue's backing bytes array is the same as rec
+
+ context.write(outputKey, outputValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexPartitioner.java
new file mode 100644
index 0000000..dcf707f
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexPartitioner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexPartitioner extends Partitioner<LongWritable, ImmutableBytesWritable> implements Configurable {
+
+ private Configuration conf;
+ private TableRecordInfo info;
+ private TableRecord rec;
+
+ @Override
+ public int getPartition(LongWritable key, ImmutableBytesWritable value, int numPartitions) {
+ rec.setBytes(value.get(), value.getOffset(), value.getLength());
+ return rec.getShard();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ try {
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ IIManager mgr = IIManager.getInstance(config);
+ IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
+ IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+ this.info = new TableRecordInfo(seg);
+ this.rec = this.info.createTableRecord();
+ } catch (IOException e) {
+ throw new RuntimeException("", e);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d921f3ca/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
new file mode 100644
index 0000000..7644456
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.IncrementalSliceMaker;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
+
+ private TableRecordInfo info;
+ private TableRecord rec;
+ private IncrementalSliceMaker builder;
+ private IIKeyValueCodec kv;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ Configuration conf = context.getConfiguration();
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ IIManager mgr = IIManager.getInstance(config);
+ IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
+ IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+ info = new TableRecordInfo(seg);
+ rec = info.createTableRecord();
+ builder = null;
+ kv = new IIKeyValueCodec(info.getDigest());
+ }
+
+ @Override
+ public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
+ throws IOException, InterruptedException {
+ for (ImmutableBytesWritable v : values) {
+ rec.setBytes(v.get(), v.getOffset(), v.getLength());
+
+ if (builder == null) {
+ builder = new IncrementalSliceMaker(info, rec.getShard());
+ }
+
+ //TODO: to delete this log
+ System.out.println(rec.getShard() + " - " + rec);
+
+ Slice slice = builder.append(rec);
+ if (slice != null) {
+ output(slice, context);
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ Slice slice = builder.close();
+ if (slice != null) {
+ output(slice, context);
+ }
+ }
+
+ private void output(Slice slice, Context context) throws IOException, InterruptedException {
+ for (IIRow pair : kv.encodeKeyValue(slice)) {
+ context.write(pair.getKey(), pair.getValue());
+ }
+ }
+
+}