You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 09:13:12 UTC
[3/4] incubator-kylin git commit: KYLIN-1112 Reorganize InvertedIndex
source codes into plug-in architecture
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
new file mode 100644
index 0000000..18d3001
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
@@ -0,0 +1,219 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+//*/
+//
+//package org.apache.kylin.engine.mr.invertedindex;
+//
+//import java.io.IOException;
+//import java.text.SimpleDateFormat;
+//import java.util.Date;
+//import java.util.TimeZone;
+//
+//import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+//import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+//import org.apache.kylin.invertedindex.IISegment;
+//import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+//import org.apache.kylin.job.constant.ExecutableConstants;
+//import org.apache.kylin.job.engine.JobEngineConfig;
+//import org.apache.kylin.job.execution.AbstractExecutable;
+//import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
+//
+//import com.google.common.base.Preconditions;
+//
+///**
+// */
+//public final class IIJobBuilder {
+//
+// final JobEngineConfig engineConfig;
+//
+// public IIJobBuilder(JobEngineConfig engineConfig) {
+// this.engineConfig = engineConfig;
+// }
+//
+// public IIJob buildJob(IISegment seg, String submitter) {
+// checkPreconditions(seg);
+//
+// IIJob result = initialJob(seg, "BUILD", submitter);
+// final String jobId = result.getId();
+// final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
+// final String intermediateTableIdentity = getIntermediateTableIdentity(intermediateTableDesc);
+// final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
+// final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
+// final String iiPath = iiRootPath + "*";
+//
+// final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId);
+// result.addTask(intermediateHiveTableStep);
+//
+// result.addTask(createFactDistinctColumnsStep(seg, intermediateTableIdentity, jobId, factDistinctColumnsPath));
+//
+// result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
+//
+// result.addTask(createInvertedIndexStep(seg, intermediateTableIdentity, iiRootPath));
+//
+// // create htable step
+// result.addTask(createCreateHTableStep(seg));
+//
+// // generate hfiles step
+// result.addTask(createConvertToHfileStep(seg, iiPath, jobId));
+//
+// // bulk load step
+// result.addTask(createBulkLoadStep(seg, jobId));
+//
+// return result;
+// }
+//
+// private IIJob initialJob(IISegment seg, String type, String submitter) {
+// IIJob result = new IIJob();
+// SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+// format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
+// result.setIIName(seg.getIIInstance().getName());
+// result.setSegmentId(seg.getUuid());
+// result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
+// result.setSubmitter(submitter);
+// return result;
+// }
+//
+// private void checkPreconditions(IISegment seg) {
+// Preconditions.checkNotNull(seg, "segment cannot be null");
+// Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
+// }
+//
+// private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) {
+// try {
+// String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM);
+// if (jobConf != null && jobConf.length() > 0) {
+// builder.append(" -conf ").append(jobConf);
+// }
+// } catch (IOException e) {
+// throw new RuntimeException(e);
+// }
+// }
+//
+// private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) {
+// return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns";
+// }
+//
+// private String getHFilePath(IISegment seg, String jobId) {
+// return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/";
+// }
+//
+// private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) {
+// MapReduceExecutable result = new MapReduceExecutable();
+// result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+// result.setMapReduceJobClass(IIDistinctColumnsJob.class);
+// StringBuilder cmd = new StringBuilder();
+// appendMapReduceParameters(cmd, engineConfig);
+// appendExecCmdParameters(cmd, "tablename", factTableName);
+// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+// appendExecCmdParameters(cmd, "output", output);
+// appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step");
+//
+// result.setMapReduceParams(cmd.toString());
+// return result;
+// }
+//
+// private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) {
+// // base cuboid job
+// HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
+// buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
+// StringBuilder cmd = new StringBuilder();
+// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+// appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
+//
+// buildDictionaryStep.setJobParams(cmd.toString());
+// buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
+// return buildDictionaryStep;
+// }
+//
+// private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
+// // base cuboid job
+// MapReduceExecutable buildIIStep = new MapReduceExecutable();
+//
+// StringBuilder cmd = new StringBuilder();
+// appendMapReduceParameters(cmd, engineConfig);
+//
+// buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
+//
+// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+// appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
+// appendExecCmdParameters(cmd, "output", iiOutputTempPath);
+// appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
+//
+// buildIIStep.setMapReduceParams(cmd.toString());
+// buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
+// return buildIIStep;
+// }
+//
+// private HadoopShellExecutable createCreateHTableStep(IISegment seg) {
+// HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
+// createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+// StringBuilder cmd = new StringBuilder();
+// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+// appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+//
+// createHtableStep.setJobParams(cmd.toString());
+// createHtableStep.setJobClass(IICreateHTableJob.class);
+//
+// return createHtableStep;
+// }
+//
+// private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) {
+// MapReduceExecutable createHFilesStep = new MapReduceExecutable();
+// createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE);
+// StringBuilder cmd = new StringBuilder();
+//
+// appendMapReduceParameters(cmd, engineConfig);
+// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+// appendExecCmdParameters(cmd, "input", inputPath);
+// appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
+// appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+// appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step");
+//
+// createHFilesStep.setMapReduceParams(cmd.toString());
+// createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
+//
+// return createHFilesStep;
+// }
+//
+// private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) {
+// HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
+// bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
+//
+// StringBuilder cmd = new StringBuilder();
+// appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
+// appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+//
+// bulkLoadStep.setJobParams(cmd.toString());
+// bulkLoadStep.setJobClass(IIBulkLoadJob.class);
+//
+// return bulkLoadStep;
+//
+// }
+//
+// private StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
+// return buf.append(" -").append(paraName).append(" ").append(paraValue);
+// }
+//
+// private String getJobWorkingDir(String uuid) {
+// return engineConfig.getHdfsWorkingDirectory() + "kylin-" + uuid;
+// }
+//
+// private String getIntermediateTableIdentity(IIJoinedFlatTableDesc intermediateTableDesc) {
+// return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName();
+// }
+//}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
new file mode 100644
index 0000000..bcae524
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexJob extends AbstractHadoopJob {
+ protected static final Logger logger = LoggerFactory.getLogger(InvertedIndexJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_II_NAME);
+ options.addOption(OPTION_TABLE_NAME);
+ options.addOption(OPTION_OUTPUT_PATH);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String iiname = getOptionValue(OPTION_II_NAME);
+ String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ // ----------------------------------------------------------------------------
+
+ System.out.println("Starting: " + job.getJobName());
+
+ IIInstance ii = getII(iiname);
+ short sharding = ii.getDescriptor().getSharding();
+
+ setJobClasspath(job);
+
+ setupMapper(ii.getFirstSegment());
+ setupReducer(output, sharding);
+ attachMetadata(ii);
+
+ return waitForCompletion(job);
+
+ } catch (Exception e) {
+ printUsage(options);
+ throw e;
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+
+ }
+
+ private IIInstance getII(String iiName) {
+ IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+ IIInstance ii = mgr.getII(iiName);
+ if (ii == null)
+ throw new IllegalArgumentException("No Inverted Index found by name " + iiName);
+ return ii;
+ }
+
+ private void attachMetadata(IIInstance ii) throws IOException {
+
+ Configuration conf = job.getConfiguration();
+ attachKylinPropsAndMetadata(ii, conf);
+
+ IISegment seg = ii.getFirstSegment();
+ conf.set(BatchConstants.CFG_II_NAME, ii.getName());
+ conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName());
+ }
+
+
+ private void setupMapper(IISegment segment) throws IOException {
+
+// String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
+// HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
+//
+// job.setInputFormatClass(HCatInputFormat.class);
+ IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
+ flatTableInputFormat.configureJob(job);
+
+
+ job.setMapperClass(InvertedIndexMapper.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(ImmutableBytesWritable.class);
+ job.setPartitionerClass(InvertedIndexPartitioner.class);
+ }
+
+ private void setupReducer(Path output, short sharding) throws IOException {
+ job.setReducerClass(InvertedIndexReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(ImmutableBytesWritable.class);
+
+ job.setNumReduceTasks(sharding);
+
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+ deletePath(job.getConfiguration(), output);
+ }
+
+ public static void main(String[] args) throws Exception {
+ InvertedIndexJob job = new InvertedIndexJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
new file mode 100644
index 0000000..88249ed
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongWritable, ImmutableBytesWritable> {
+
+ private TableRecordInfo info;
+ private TableRecord rec;
+
+ private LongWritable outputKey;
+ private ImmutableBytesWritable outputValue;
+ private IMRInput.IMRTableInputFormat flatTableInputFormat;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ Configuration conf = context.getConfiguration();
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ IIManager mgr = IIManager.getInstance(config);
+ IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
+ IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+ this.info = new TableRecordInfo(seg);
+ this.rec = this.info.createTableRecord();
+
+ outputKey = new LongWritable();
+ outputValue = new ImmutableBytesWritable(rec.getBytes());
+
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(ii.getFirstSegment()).getFlatTableInputFormat();
+ }
+
+ @Override
+ public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+
+ String[] row = flatTableInputFormat.parseMapperInput(record);
+ rec.reset();
+ for (int i = 0; i < row.length; i++) {
+ Object fieldValue = row[i];
+ if (fieldValue != null)
+ rec.setValueString(i, fieldValue.toString());
+ }
+
+ outputKey.set(rec.getTimestamp());
+ // outputValue's backing bytes array is the same as rec
+
+ context.write(outputKey, outputValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexPartitioner.java
new file mode 100644
index 0000000..dcf707f
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexPartitioner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexPartitioner extends Partitioner<LongWritable, ImmutableBytesWritable> implements Configurable {
+
+ private Configuration conf;
+ private TableRecordInfo info;
+ private TableRecord rec;
+
+ @Override
+ public int getPartition(LongWritable key, ImmutableBytesWritable value, int numPartitions) {
+ rec.setBytes(value.get(), value.getOffset(), value.getLength());
+ return rec.getShard();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ try {
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ IIManager mgr = IIManager.getInstance(config);
+ IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
+ IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+ this.info = new TableRecordInfo(seg);
+ this.rec = this.info.createTableRecord();
+ } catch (IOException e) {
+ throw new RuntimeException("", e);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
new file mode 100644
index 0000000..7644456
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.IncrementalSliceMaker;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
+
+ private TableRecordInfo info;
+ private TableRecord rec;
+ private IncrementalSliceMaker builder;
+ private IIKeyValueCodec kv;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ Configuration conf = context.getConfiguration();
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ IIManager mgr = IIManager.getInstance(config);
+ IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
+ IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+ info = new TableRecordInfo(seg);
+ rec = info.createTableRecord();
+ builder = null;
+ kv = new IIKeyValueCodec(info.getDigest());
+ }
+
+ @Override
+ public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
+ throws IOException, InterruptedException {
+ for (ImmutableBytesWritable v : values) {
+ rec.setBytes(v.get(), v.getOffset(), v.getLength());
+
+ if (builder == null) {
+ builder = new IncrementalSliceMaker(info, rec.getShard());
+ }
+
+ //TODO: to delete this log
+ System.out.println(rec.getShard() + " - " + rec);
+
+ Slice slice = builder.append(rec);
+ if (slice != null) {
+ output(slice, context);
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ Slice slice = builder.close();
+ if (slice != null) {
+ output(slice, context);
+ }
+ }
+
+ private void output(Slice slice, Context context) throws IOException, InterruptedException {
+ for (IIRow pair : kv.encodeKeyValue(slice)) {
+ context.write(pair.getKey(), pair.getValue());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
new file mode 100644
index 0000000..277dea5
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+import java.io.IOException;
+
+/**
+ */
+public class UpdateInvertedIndexInfoAfterBuildStep extends AbstractExecutable {
+
+ private static final String SEGMENT_ID = "segmentId";
+ private static final String II_NAME = "iiName";
+ private static final String JOB_ID = "jobId";
+
+ public UpdateInvertedIndexInfoAfterBuildStep() {
+ super();
+ }
+
+ public void setInvertedIndexName(String cubeName) {
+ this.setParam(II_NAME, cubeName);
+ }
+
+ private String getInvertedIndexName() {
+ return getParam(II_NAME);
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.setParam(SEGMENT_ID, segmentId);
+ }
+
+ private String getSegmentId() {
+ return getParam(SEGMENT_ID);
+ }
+
+ public void setJobId(String id) {
+ setParam(JOB_ID, id);
+ }
+
+ private String getJobId() {
+ return getParam(JOB_ID);
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+ IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+ IIInstance ii = mgr.getII(getInvertedIndexName());
+ IISegment segment = ii.getFirstSegment();
+ segment.setStatus(SegmentStatusEnum.READY);
+
+ segment.setLastBuildJobID(getJobId());
+ segment.setLastBuildTime(System.currentTimeMillis());
+
+ try {
+ mgr.updateII(ii);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (IOException e) {
+ logger.error("fail to update inverted index after build", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
index 2e65ae6..9f17d60 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
@@ -25,6 +25,7 @@ import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,18 +48,18 @@ public class SparkCubingJobBuilder extends JobBuilderSupport {
}
public DefaultChainedExecutable build() {
- final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config);
final String jobId = result.getId();
inputSide.addStepPhase1_CreateFlatTable(result);
- final CubeJoinedFlatTableDesc joinedFlatTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+ final IJoinedFlatTableDesc joinedFlatTableDesc = seg.getJoinedFlatTableDesc();
final String tableName = joinedFlatTableDesc.getTableName();
logger.info("intermediate table:" + tableName);
final SparkExecutable sparkExecutable = new SparkExecutable();
sparkExecutable.setClassName(SparkCubing.class.getName());
sparkExecutable.setParam("hiveTable", tableName);
- sparkExecutable.setParam("cubeName", seg.getCubeInstance().getName());
+ sparkExecutable.setParam("cubeName", seg.getRealization().getName());
sparkExecutable.setParam("segmentId", seg.getUuid());
sparkExecutable.setParam("confPath", confPath);
sparkExecutable.setParam("coprocessor", coprocessor);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/engine-streaming/pom.xml b/engine-streaming/pom.xml
index 955124c..46b63b3 100644
--- a/engine-streaming/pom.xml
+++ b/engine-streaming/pom.xml
@@ -26,6 +26,11 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-invertedindex</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-storage</artifactId>
<version>${project.parent.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
new file mode 100644
index 0000000..fa5a0b2
--- /dev/null
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.streaming.invertedindex;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.invertedindex.index.BatchSliceMaker;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ */
+public final class SliceBuilder {
+
+ private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
+
+ private final BatchSliceMaker sliceMaker;
+ private final IIDesc iiDesc;
+ private final boolean useLocalDict;
+
+ public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
+ this.iiDesc = desc;
+ this.sliceMaker = new BatchSliceMaker(desc, shard);
+ this.useLocalDict = useLocalDict;
+ }
+
+ public Slice buildSlice(StreamingBatch microStreamBatch) {
+ final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
+ @Nullable
+ @Override
+ public List<String> apply(@Nullable StreamingMessage input) {
+ return input.getData();
+ }
+ });
+ final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+ TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
+ return build(messages, tableRecordInfo, dictionaries);
+ }
+
+ private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
+ final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
+ @Nullable
+ @Override
+ public TableRecord apply(@Nullable List<String> input) {
+ TableRecord result = tableRecordInfo.createTableRecord();
+ for (int i = 0; i < input.size(); i++) {
+ result.setValueString(i, input.get(i));
+ }
+ return result;
+ }
+ }));
+ slice.setLocalDictionaries(localDictionary);
+ return slice;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/examples/test_case_data/sandbox/hbase-site.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/hbase-site.xml b/examples/test_case_data/sandbox/hbase-site.xml
index 734908e..46d5345 100644
--- a/examples/test_case_data/sandbox/hbase-site.xml
+++ b/examples/test_case_data/sandbox/hbase-site.xml
@@ -190,5 +190,22 @@
<name>zookeeper.znode.parent</name>
<value>/hbase-unsecure</value>
</property>
-
+ <property>
+ <name>hbase.client.pause</name>
+ <value>100</value>
+ <description>General client pause value. Used mostly as value to wait
+ before running a retry of a failed get, region lookup, etc.
+ See hbase.client.retries.number for description of how we backoff from
+ this initial pause amount and how this pause works w/ retries.</description>
+ </property>
+ <property>
+ <name>hbase.client.retries.number</name>
+ <value>5</value>
+ <description>Maximum retries. Used as maximum for all retryable
+ operations such as the getting of a cell's value, starting a row update,
+ etc. Retry interval is a rough function based on hbase.client.pause. At
+ first we retry at this interval but then with backoff, we pretty quickly reach
+ retrying every ten seconds. See HConstants#RETRY_BACKOFF for how the backup
+ ramps up. Change this setting and hbase.client.pause to suit your workload.</description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/pom.xml
----------------------------------------------------------------------
diff --git a/invertedindex/pom.xml b/invertedindex/pom.xml
index 78850f4..70f9d3f 100644
--- a/invertedindex/pom.xml
+++ b/invertedindex/pom.xml
@@ -33,19 +33,28 @@
<!--Kylin Jar -->
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-engine-streaming</artifactId>
+ <artifactId>kylin-core-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-source-hive</artifactId>
+ <artifactId>kylin-core-metadata</artifactId>
<version>${project.parent.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-dictionary</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.n3twork.druid</groupId>
<artifactId>extendedset</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.ning</groupId>
+ <artifactId>compress-lzf</artifactId>
+ </dependency>
<!-- Env & Test -->
<dependency>
@@ -55,12 +64,8 @@
<scope>test</scope>
<version>${project.parent.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-hcatalog-core</artifactId>
- <version>${hive-hcatalog.version}</version>
- <scope>provided</scope>
- </dependency>
+
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index 5ce11f8..20f289c 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -26,12 +26,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IStorageAware;
-import org.apache.kylin.metadata.model.LookupDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.*;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
@@ -48,7 +43,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
*/
@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class IIInstance extends RootPersistentEntity implements IRealization {
+public class IIInstance extends RootPersistentEntity implements IRealization, IBuildable {
public static IIInstance create(String iiName, String projectName, IIDesc iiDesc) {
IIInstance iii = new IIInstance();
@@ -373,4 +368,13 @@ public class IIInstance extends RootPersistentEntity implements IRealization {
return IStorageAware.ID_HBASE;
}
+ @Override
+ public int getEngineType() {
+ return getDescriptor().getEngineType();
+ }
+
+ @Override
+ public int getSourceType() {
+ return getDataModelDesc().getFactTableDesc().getSourceType();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
index af848de..adcca8b 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
@@ -29,6 +29,9 @@ import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -37,6 +40,8 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonBackReference;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
/**
* @author honma
@@ -44,7 +49,7 @@ import com.google.common.base.Objects;
// TODO: remove segment concept for II, append old hbase table
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class IISegment implements Comparable<IISegment>, IDictionaryAware {
+public class IISegment implements Comparable<IISegment>, IDictionaryAware, IRealizationSegment {
@JsonBackReference
private IIInstance iiInstance;
@@ -108,6 +113,7 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware {
// ============================================================================
+ @Override
public String getUuid() {
return uuid;
}
@@ -116,6 +122,7 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware {
this.uuid = id;
}
+ @Override
public String getName() {
return name;
}
@@ -204,6 +211,7 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware {
this.iiInstance = iiInstance;
}
+ @Override
public String getStorageLocationIdentifier() {
return storageLocationIdentifier;
}
@@ -289,4 +297,28 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware {
this.createTimeUTC = createTimeUTC;
}
+ @Override
+ public int getEngineType() {
+ return 0;
+ }
+
+ @Override
+ public int getSourceType() {
+ return 0;
+ }
+
+ @Override
+ public int getStorageType() {
+ return 0;
+ }
+
+ @Override
+ public IRealization getRealization() {
+ return iiInstance;
+ }
+
+ @Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc() {
+ return new IIJoinedFlatTableDesc(this.getIIDesc());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index fee16ba..bfa4eaa 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -35,14 +35,7 @@ import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.DimensionDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.*;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -87,6 +80,13 @@ public class IIDesc extends RootPersistentEntity {
private int sliceSize = 50000; // no. rows
@JsonProperty("useLocalDictionary")
private boolean useLocalDictionary = true;
+
+ @JsonProperty("engine_type")
+ private int engineType = IEngineAware.ID_MR_II;
+
+ @JsonProperty("storage_type")
+ private int storageType = IStorageAware.ID_HBASE;
+
@JsonProperty("signature")
private String signature;
@@ -399,4 +399,20 @@ public class IIDesc extends RootPersistentEntity {
}
+ public int getStorageType() {
+ return storageType;
+ }
+
+ public void setStorageType(int storageType) {
+ this.storageType = storageType;
+ }
+
+ public int getEngineType() {
+ return engineType;
+ }
+
+ public void setEngineType(int engineType) {
+ this.engineType = engineType;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 4f3ab80..7e54a98 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -19,6 +19,7 @@
package org.apache.kylin.invertedindex.model;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
@@ -70,7 +71,7 @@ public class IIKeyValueCodec implements KeyValueCodec {
private IIRow collectKeyValues(Slice slice, int col, CompressedValueContainer container) {
ImmutableBytesWritable key = encodeKey(slice.getShard(), slice.getTimestamp(), col);
ImmutableBytesWritable value = container.toBytes();
- final Dictionary<?> dictionary = slice.getLocalDictionaries()[col];
+ final Dictionary<?> dictionary = slice.getLocalDictionaries() != null ? slice.getLocalDictionaries()[col] : null;
if (dictionary == null) {
return new IIRow(key, value, new ImmutableBytesWritable(BytesUtil.EMPTY_BYTE_ARRAY));
} else {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
deleted file mode 100644
index ba337c8..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.invertedindex.streaming;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
-import org.apache.kylin.invertedindex.index.BatchSliceMaker;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
-/**
- */
-public final class SliceBuilder {
-
- private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
-
- private final BatchSliceMaker sliceMaker;
- private final IIDesc iiDesc;
- private final boolean useLocalDict;
-
- public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
- this.iiDesc = desc;
- this.sliceMaker = new BatchSliceMaker(desc, shard);
- this.useLocalDict = useLocalDict;
- }
-
- public Slice buildSlice(StreamingBatch microStreamBatch) {
- final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
- @Nullable
- @Override
- public List<String> apply(@Nullable StreamingMessage input) {
- return input.getData();
- }
- });
- final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
- TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
- return build(messages, tableRecordInfo, dictionaries);
- }
-
- private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
- final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
- @Nullable
- @Override
- public TableRecord apply(@Nullable List<String> input) {
- TableRecord result = tableRecordInfo.createTableRecord();
- for (int i = 0; i < input.size(); i++) {
- result.setValueString(i, input.get(i));
- }
- return result;
- }
- }));
- slice.setLocalDictionaries(localDictionary);
- return slice;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
deleted file mode 100644
index 87ee70e..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.dict;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.engine.mr.DFSFileTable;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
-
-/**
- */
-public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_II_NAME);
- options.addOption(OPTION_INPUT_PATH);
- parseOptions(options, args);
-
- final String iiname = getOptionValue(OPTION_II_NAME);
- final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
- final KylinConfig config = KylinConfig.getInstanceFromEnv();
-
- IIManager mgr = IIManager.getInstance(config);
- IIInstance ii = mgr.getII(iiname);
-
- mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), new DistinctColumnValuesProvider() {
- @Override
- public ReadableTable getDistinctValuesFor(TblColRef col) {
- return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
- }
- });
- return 0;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
deleted file mode 100644
index 300c89b..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- */
-public class IIBulkLoadJob extends AbstractHadoopJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- options.addOption(OPTION_II_NAME);
- parseOptions(options, args);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME);
- String input = getOptionValue(OPTION_INPUT_PATH);
- String iiname = getOptionValue(OPTION_II_NAME);
-
- FileSystem fs = FileSystem.get(getConf());
- FsPermission permission = new FsPermission((short) 0777);
- fs.setPermission(new Path(input, IIDesc.HBASE_FAMILY), permission);
-
- int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName });
-
- IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
- IIInstance ii = mgr.getII(iiname);
- IISegment seg = ii.getFirstSegment();
- seg.setStorageLocationIdentifier(tableName);
- seg.setStatus(SegmentStatusEnum.READY);
- mgr.updateII(ii);
-
- return hbaseExitCode;
-
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
deleted file mode 100644
index 528f06f..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- *
- */
-public class IICreateHFileJob extends AbstractHadoopJob {
-
- protected static final Logger logger = LoggerFactory.getLogger(IICreateHFileJob.class);
-
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_II_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- parseOptions(options, args);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-
- setJobClasspath(job);
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
- FileOutputFormat.setOutputPath(job, output);
-
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(IICreateHFileMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME);
- HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName);
- HFileOutputFormat.configureIncrementalLoad(job, htable);
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java
deleted file mode 100644
index 1adf8d6..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.invertedindex.model.IIDesc;
-
-/**
- * @author yangli9
- */
-public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
-
- long timestamp;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- timestamp = System.currentTimeMillis();
- }
-
- @Override
- protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
-
- KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
- IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
- IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, //
- timestamp, Type.Put, //
- value.get(), value.getOffset(), value.getLength());
-
- context.write(key, kv);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
deleted file mode 100644
index 0b7cb7a..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-
-/**
- * @author George Song (ysong1)
- */
-public class IICreateHTableJob extends AbstractHadoopJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_II_NAME);
- options.addOption(OPTION_HTABLE_NAME);
- parseOptions(options, args);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME);
- String iiName = getOptionValue(OPTION_II_NAME);
-
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- IIManager iiManager = IIManager.getInstance(config);
- IIInstance ii = iiManager.getII(iiName);
- int sharding = ii.getDescriptor().getSharding();
-
- HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
- HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
- cf.setMaxVersions(1);
-
- String hbaseDefaultCC = config.getHbaseDefaultCompressionCodec().toLowerCase();
-
- switch (hbaseDefaultCC) {
- case "snappy": {
- logger.info("hbase will use snappy to compress data");
- cf.setCompressionType(Compression.Algorithm.SNAPPY);
- break;
- }
- case "lzo": {
- logger.info("hbase will use lzo to compress data");
- cf.setCompressionType(Compression.Algorithm.LZO);
- break;
- }
- case "gz":
- case "gzip": {
- logger.info("hbase will use gzip to compress data");
- cf.setCompressionType(Compression.Algorithm.GZ);
- break;
- }
- case "lz4": {
- logger.info("hbase will use lz4 to compress data");
- cf.setCompressionType(Compression.Algorithm.LZ4);
- break;
- }
- default: {
- logger.info("hbase will not user any compression codec to compress data");
- }
- }
-
- cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
- tableDesc.addFamily(cf);
- tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
- tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
- tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
-
- Configuration conf = HBaseConfiguration.create(getConf());
- if (User.isHBaseSecurityEnabled(conf)) {
- // add coprocessor for bulk load
- tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
- }
-
- IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
-
- // drop the table first
- HBaseAdmin admin = new HBaseAdmin(conf);
- if (admin.tableExists(tableName)) {
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
- }
-
- // create table
- byte[][] splitKeys = getSplits(sharding);
- if (splitKeys.length == 0)
- splitKeys = null;
- admin.createTable(tableDesc, splitKeys);
- if (splitKeys != null) {
- for (int i = 0; i < splitKeys.length; i++) {
- System.out.println("split key " + i + ": " + BytesUtil.toHex(splitKeys[i]));
- }
- }
- System.out.println("create hbase table " + tableName + " done.");
- admin.close();
-
- return 0;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- //one region for one shard
- private byte[][] getSplits(int shard) {
- byte[][] result = new byte[shard - 1][];
- for (int i = 1; i < shard; ++i) {
- byte[] split = new byte[IIKeyValueCodec.SHARD_LEN];
- BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN);
- result[i - 1] = split;
- }
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java
deleted file mode 100644
index a4c1961..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * THIS IS A TAILORED DUPLICATE OF org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI TO AVOID CYCLIC
- * DEPENDENCY. INVERTED-INDEX CODE NOW SPLITTED ACROSS kylin-invertedindex AND kylin-storage-hbase.
- * DEFENITELY NEED FURTHER REFACTOR.
- */
-public class IIDeployCoprocessorCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(IIDeployCoprocessorCLI.class);
-
- public static final String CubeObserverClass = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
- public static final String CubeEndpointClass = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
- public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
-
- public static void deployCoprocessor(HTableDescriptor tableDesc) {
- try {
- initHTableCoprocessor(tableDesc);
- logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
-
- } catch (Exception ex) {
- logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
- logger.error("Will try creating the table without coprocessor.");
- }
- }
-
- private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
-
- String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
- Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
-
- addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
- }
-
- private static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Add coprocessor on " + desc.getNameAsString());
- desc.addCoprocessor(IIEndpointClass, hdfsCoprocessorJar, 1000, null);
- desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
- desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null);
- }
-
- private static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
- Path uploadPath = null;
- File localCoprocessorFile = new File(localCoprocessorJar);
-
- // check existing jars
- if (oldJarPaths == null) {
- oldJarPaths = new HashSet<String>();
- }
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (isSame(localCoprocessorFile, fileStatus)) {
- uploadPath = fileStatus.getPath();
- break;
- }
- String filename = fileStatus.getPath().toString();
- if (filename.endsWith(".jar")) {
- oldJarPaths.add(filename);
- }
- }
-
- // upload if not existing
- if (uploadPath == null) {
- // figure out a unique new jar file name
- Set<String> oldJarNames = new HashSet<String>();
- for (String path : oldJarPaths) {
- oldJarNames.add(new Path(path).getName());
- }
- String baseName = getBaseFileName(localCoprocessorJar);
- String newName = null;
- int i = 0;
- while (newName == null) {
- newName = baseName + "-" + (i++) + ".jar";
- if (oldJarNames.contains(newName))
- newName = null;
- }
-
- // upload
- uploadPath = new Path(coprocessorDir, newName);
- FileInputStream in = null;
- FSDataOutputStream out = null;
- try {
- in = new FileInputStream(localCoprocessorFile);
- out = fileSystem.create(uploadPath);
- IOUtils.copy(in, out);
- } finally {
- IOUtils.closeQuietly(in);
- IOUtils.closeQuietly(out);
- }
-
- fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
-
- }
-
- uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
- return uploadPath;
- }
-
- private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
- return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
- }
-
- private static String getBaseFileName(String localCoprocessorJar) {
- File localJar = new File(localCoprocessorJar);
- String baseName = localJar.getName();
- if (baseName.endsWith(".jar"))
- baseName = baseName.substring(0, baseName.length() - ".jar".length());
- return baseName;
- }
-
- private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
- String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
- Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
- fileSystem.mkdirs(coprocessorDir);
- return coprocessorDir;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
deleted file mode 100644
index 1f4611b..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.KylinReducer;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> {
-
- private Text outputValue = new Text();
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- }
-
- @Override
- public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
- HashSet<ByteArray> set = new HashSet<ByteArray>();
- for (Text textValue : values) {
- ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
- set.add(value);
- }
-
- for (ByteArray value : set) {
- outputValue.set(value.array(), value.offset(), value.length());
- context.write(key, outputValue);
- }
- }
-
-}