You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by lu...@apache.org on 2015/01/07 15:46:39 UTC
[18/51] [partial] incubator-kylin git commit: migrate repo from
github.com to apache git
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java
new file mode 100644
index 0000000..022963d
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.HadoopUtil;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc;
+
+/**
+ * @author ysong1
+ *
+ */
+public class IIBulkLoadJob extends AbstractHadoopJob {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_HTABLE_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ parseOptions(options, args);
+
+ String tableName = getOptionValue(OPTION_HTABLE_NAME);
+ String input = getOptionValue(OPTION_INPUT_PATH);
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+
+ FileSystem fs = FileSystem.get(getConf());
+ FsPermission permission = new FsPermission((short) 0777);
+ fs.setPermission(new Path(input, InvertedIndexDesc.HBASE_FAMILY), permission);
+
+ int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName });
+
+ CubeManager mgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = mgr.getCube(cubeName);
+ CubeSegment seg = cube.getFirstSegment();
+ seg.setStorageLocationIdentifier(tableName);
+ seg.setStatus(CubeSegmentStatusEnum.READY);
+ mgr.updateCube(cube);
+
+ return hbaseExitCode;
+
+ } catch (Exception e) {
+ printUsage(options);
+ e.printStackTrace(System.err);
+ return 2;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ IIBulkLoadJob job = new IIBulkLoadJob();
+ job.setConf(HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl()));
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java
new file mode 100644
index 0000000..0c1afd0
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.File;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.HadoopUtil;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author yangli9
+ *
+ */
+public class IICreateHFileJob extends AbstractHadoopJob {
+
+ protected static final Logger log = LoggerFactory.getLogger(IICreateHFileJob.class);
+
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_HTABLE_NAME);
+ parseOptions(options, args);
+
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+
+ File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+ if (JarFile.exists()) {
+ job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+ } else {
+ job.setJarByClass(this.getClass());
+ }
+
+ addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(IICreateHFileMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+
+ String tableName = getOptionValue(OPTION_HTABLE_NAME);
+ HTable htable = new HTable(getConf(), tableName);
+ HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+ this.deletePath(job.getConfiguration(), output);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ printUsage(options);
+ log.error(e.getLocalizedMessage(), e);
+ return 2;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ IICreateHFileJob job = new IICreateHFileJob();
+ job.setConf(HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl()));
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java
new file mode 100644
index 0000000..2ceaa1c
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import static com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * @author yangli9
+ *
+ */
+public class IICreateHFileMapper extends Mapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
+
+ long timestamp;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ timestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
+
+ KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
+ HBASE_FAMILY_BYTES, 0, HBASE_FAMILY_BYTES.length, //
+ HBASE_QUALIFIER_BYTES, 0, HBASE_QUALIFIER_BYTES.length, //
+ timestamp, Type.Put, //
+ value.get(), value.getOffset(), value.getLength());
+
+ context.write(key, kv);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java
new file mode 100644
index 0000000..04fd274
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.BytesUtil;
+import com.kylinolap.common.util.HadoopUtil;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.invertedindex.IIKeyValueCodec;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class IICreateHTableJob extends AbstractHadoopJob {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_HTABLE_NAME);
+ parseOptions(options, args);
+
+ String tableName = getOptionValue(OPTION_HTABLE_NAME);
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+
+ CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+ int sharding = cubeInstance.getInvertedIndexDesc().getSharding();
+
+ HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+ HColumnDescriptor cf = new HColumnDescriptor(InvertedIndexDesc.HBASE_FAMILY);
+ cf.setMaxVersions(1);
+ cf.setCompressionType(Algorithm.LZO);
+ cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+ tableDesc.addFamily(cf);
+
+ Configuration conf = HBaseConfiguration.create(getConf());
+ if (User.isHBaseSecurityEnabled(conf)) {
+ // add coprocessor for bulk load
+ tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+ }
+
+ // drop the table first
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ if (admin.tableExists(tableName)) {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+
+ // create table
+ byte[][] splitKeys = getSplits(sharding);
+ if (splitKeys.length == 0)
+ splitKeys = null;
+ admin.createTable(tableDesc, splitKeys);
+ if (splitKeys != null) {
+ for (int i = 0; i < splitKeys.length; i++) {
+ System.out.println("split key " + i + ": " + BytesUtil.toHex(splitKeys[i]));
+ }
+ }
+ System.out.println("create hbase table " + tableName + " done.");
+ admin.close();
+
+ return 0;
+ } catch (Exception e) {
+ printUsage(options);
+ e.printStackTrace(System.err);
+ log.error(e.getLocalizedMessage(), e);
+ return 2;
+ }
+ }
+
+ //one region for one shard
+ private byte[][] getSplits(int shard) {
+ byte[][] result = new byte[shard - 1][];
+ for (int i = 1; i < shard; ++i) {
+ byte[] split = new byte[IIKeyValueCodec.SHARD_LEN];
+ BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN);
+ result[i - 1] = split;
+ }
+ return result;
+ }
+
+ public static void main(String[] args) throws Exception {
+ IICreateHTableJob job = new IICreateHTableJob();
+ job.setConf(HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl()));
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
new file mode 100644
index 0000000..83219d2
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import com.kylinolap.common.util.ByteArray;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsCombiner extends Reducer<ShortWritable, Text, ShortWritable, Text> {
+
+ private Text outputValue = new Text();
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ }
+
+ @Override
+ public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+ HashSet<ByteArray> set = new HashSet<ByteArray>();
+ for (Text textValue : values) {
+ ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+ set.add(value);
+ }
+
+ for (ByteArray value : set) {
+ outputValue.set(value.data);
+ context.write(key, outputValue);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java
new file mode 100644
index 0000000..bc12db2
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.metadata.MetadataManager;
+import com.kylinolap.metadata.model.schema.ColumnDesc;
+import com.kylinolap.metadata.model.schema.TableDesc;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsJob extends AbstractHadoopJob {
+ protected static final Logger log = LoggerFactory.getLogger(IIDistinctColumnsJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_TABLE_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_INPUT_FORMAT);
+ options.addOption(OPTION_INPUT_DELIM);
+ options.addOption(OPTION_OUTPUT_PATH);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase();
+ Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+ String inputFormat = getOptionValue(OPTION_INPUT_FORMAT);
+ String inputDelim = getOptionValue(OPTION_INPUT_DELIM);
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ // ----------------------------------------------------------------------------
+
+ System.out.println("Starting: " + job.getJobName());
+
+ setupMapInput(input, inputFormat, inputDelim);
+ setupReduceOutput(output);
+
+ // pass table and columns
+ MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+ TableDesc table = metaMgr.getTableDesc(tableName);
+ job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
+ job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(table));
+
+ return waitForCompletion(job);
+
+ } catch (Exception e) {
+ printUsage(options);
+ log.error(e.getLocalizedMessage(), e);
+ return 2;
+ }
+
+ }
+
+ private String getColumns(TableDesc table) {
+ StringBuilder buf = new StringBuilder();
+ for (ColumnDesc col : table.getColumns()) {
+ if (buf.length() > 0)
+ buf.append(",");
+ buf.append(col.getName());
+ }
+ return buf.toString();
+ }
+
+ private void setupMapInput(Path input, String inputFormat, String inputDelim) throws IOException {
+ FileInputFormat.setInputPaths(job, input);
+
+ File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+ if (JarFile.exists()) {
+ job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+ } else {
+ job.setJarByClass(this.getClass());
+ }
+
+ if ("textinputformat".equalsIgnoreCase(inputFormat) || "text".equalsIgnoreCase(inputFormat)) {
+ job.setInputFormatClass(TextInputFormat.class);
+ } else {
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ }
+
+ if ("t".equals(inputDelim)) {
+ inputDelim = "\t";
+ } else if ("177".equals(inputDelim)) {
+ inputDelim = "\177";
+ }
+ if (inputDelim != null) {
+ job.getConfiguration().set(BatchConstants.INPUT_DELIM, inputDelim);
+ }
+
+ job.setMapperClass(IIDistinctColumnsMapper.class);
+ job.setCombinerClass(IIDistinctColumnsCombiner.class);
+ job.setMapOutputKeyClass(ShortWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ }
+
+ private void setupReduceOutput(Path output) throws IOException {
+ job.setReducerClass(IIDistinctColumnsReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Text.class);
+
+ FileOutputFormat.setOutputPath(job, output);
+ job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+ job.setNumReduceTasks(1);
+
+ deletePath(job.getConfiguration(), output);
+ }
+
+ public static void main(String[] args) throws Exception {
+ IIDistinctColumnsJob job = new IIDistinctColumnsJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
new file mode 100644
index 0000000..b7456bf
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import com.kylinolap.cube.common.BytesSplitter;
+import com.kylinolap.cube.common.SplittedBytes;
+import com.kylinolap.job.constant.BatchConstants;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsMapper<KEYIN> extends Mapper<KEYIN, Text, ShortWritable, Text> {
+
+ private String[] columns;
+ private int delim;
+ private BytesSplitter splitter;
+
+ private ShortWritable outputKey = new ShortWritable();
+ private Text outputValue = new Text();
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
+ String inputDelim = conf.get(BatchConstants.INPUT_DELIM);
+ this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0);
+ this.splitter = new BytesSplitter(200, 4096);
+ }
+
+ @Override
+ public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
+ if (delim == -1) {
+ delim = splitter.detectDelim(value, columns.length);
+ }
+
+ int nParts = splitter.split(value.getBytes(), value.getLength(), (byte) delim);
+ SplittedBytes[] parts = splitter.getSplitBuffers();
+
+ if (nParts != columns.length) {
+ throw new RuntimeException("Got " + parts.length + " from -- " + value.toString() + " -- but only " + columns.length + " expected");
+ }
+
+ for (short i = 0; i < nParts; i++) {
+ outputKey.set(i);
+ outputValue.set(parts[i].value, 0, parts[i].length);
+ context.write(outputKey, outputValue);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
new file mode 100644
index 0000000..f170057
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import com.kylinolap.common.util.ByteArray;
+import com.kylinolap.job.constant.BatchConstants;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsReducer extends Reducer<ShortWritable, Text, NullWritable, Text> {
+
+ private String[] columns;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
+ }
+
+ @Override
+ public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ String columnName = columns[key.get()];
+
+ HashSet<ByteArray> set = new HashSet<ByteArray>();
+ for (Text textValue : values) {
+ ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+ set.add(value);
+ }
+
+ Configuration conf = context.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+ FSDataOutputStream out = fs.create(new Path(outputPath, columnName));
+
+ try {
+ for (ByteArray value : set) {
+ out.write(value.data);
+ out.write('\n');
+ }
+ } finally {
+ out.close();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java
new file mode 100644
index 0000000..6681db6
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexJob extends AbstractHadoopJob {
+ protected static final Logger log = LoggerFactory.getLogger(InvertedIndexJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_INPUT_FORMAT);
+ options.addOption(OPTION_INPUT_DELIM);
+ options.addOption(OPTION_OUTPUT_PATH);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+ String inputFormat = getOptionValue(OPTION_INPUT_FORMAT);
+ String inputDelim = getOptionValue(OPTION_INPUT_DELIM);
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ // ----------------------------------------------------------------------------
+
+ System.out.println("Starting: " + job.getJobName());
+
+ CubeInstance cube = getCube(cubeName);
+
+ setupMapInput(input, inputFormat, inputDelim);
+ setupReduceOutput(output, cube.getInvertedIndexDesc().getSharding());
+ attachMetadata(cube);
+
+ return waitForCompletion(job);
+
+ } catch (Exception e) {
+ printUsage(options);
+ log.error(e.getLocalizedMessage(), e);
+ return 2;
+ }
+
+ }
+
+ /**
+ * @param cubeName
+ * @return
+ */
+ private CubeInstance getCube(String cubeName) {
+ CubeManager mgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = mgr.getCube(cubeName);
+ if (cube == null)
+ throw new IllegalArgumentException("No Inverted Index Cubefound by name " + cubeName);
+ return cube;
+ }
+
+ private void attachMetadata(CubeInstance cube) throws IOException {
+
+ Configuration conf = job.getConfiguration();
+ attachKylinPropsAndMetadata(cube, conf);
+
+ CubeSegment seg = cube.getFirstSegment();
+ conf.set(BatchConstants.CFG_CUBE_NAME, cube.getName());
+ conf.set(BatchConstants.CFG_CUBE_SEGMENT_NAME, seg.getName());
+ }
+
+ private void setupMapInput(Path input, String inputFormat, String inputDelim) throws IOException {
+ FileInputFormat.setInputPaths(job, input);
+
+ File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+ if (JarFile.exists()) {
+ job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+ } else {
+ job.setJarByClass(this.getClass());
+ }
+
+ if ("textinputformat".equalsIgnoreCase(inputFormat) || "text".equalsIgnoreCase(inputFormat)) {
+ job.setInputFormatClass(TextInputFormat.class);
+ } else {
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ }
+
+ if ("t".equals(inputDelim)) {
+ inputDelim = "\t";
+ } else if ("177".equals(inputDelim)) {
+ inputDelim = "\177";
+ }
+ if (inputDelim != null) {
+ job.getConfiguration().set(BatchConstants.INPUT_DELIM, inputDelim);
+ }
+
+ job.setMapperClass(InvertedIndexMapper.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(ImmutableBytesWritable.class);
+ job.setPartitionerClass(InvertedIndexPartitioner.class);
+ }
+
+ private void setupReduceOutput(Path output, short sharding) throws IOException {
+ job.setReducerClass(InvertedIndexReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(ImmutableBytesWritable.class);
+
+ job.setNumReduceTasks(sharding);
+
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+ deletePath(job.getConfiguration(), output);
+ }
+
+ public static void main(String[] args) throws Exception {
+ InvertedIndexJob job = new InvertedIndexJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java
new file mode 100644
index 0000000..f555c40
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+import com.kylinolap.cube.common.BytesSplitter;
+import com.kylinolap.cube.common.SplittedBytes;
+import com.kylinolap.cube.invertedindex.TableRecord;
+import com.kylinolap.cube.invertedindex.TableRecordInfo;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author yangli9
+ *
+ */
+public class InvertedIndexMapper<KEYIN> extends Mapper<KEYIN, Text, LongWritable, ImmutableBytesWritable> {
+
+ private TableRecordInfo info;
+ private TableRecord rec;
+ private int delim;
+ private BytesSplitter splitter;
+
+ private LongWritable outputKey;
+ private ImmutableBytesWritable outputValue;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ String inputDelim = conf.get(BatchConstants.INPUT_DELIM);
+ this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0);
+ this.splitter = new BytesSplitter(200, 4096);
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+ CubeManager mgr = CubeManager.getInstance(config);
+ CubeInstance cube = mgr.getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
+ CubeSegment seg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), CubeSegmentStatusEnum.NEW);
+ this.info = new TableRecordInfo(seg);
+ this.rec = new TableRecord(this.info);
+
+ outputKey = new LongWritable();
+ outputValue = new ImmutableBytesWritable(rec.getBytes());
+ }
+
+ @Override
+ public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
+ if (delim == -1) {
+ delim = splitter.detectDelim(value, info.getColumnCount());
+ }
+
+ int nParts = splitter.split(value.getBytes(), value.getLength(), (byte) delim);
+ SplittedBytes[] parts = splitter.getSplitBuffers();
+
+ if (nParts != info.getColumnCount()) {
+ throw new RuntimeException("Got " + parts.length + " from -- " + value.toString() + " -- but only " + info.getColumnCount() + " expected");
+ }
+
+ rec.reset();
+ for (int i = 0; i < nParts; i++) {
+ rec.setValueString(i, Bytes.toString(parts[i].value, 0, parts[i].length));
+ }
+
+ outputKey.set(rec.getTimestamp());
+ // outputValue's backing bytes array is the same as rec
+
+ context.write(outputKey, outputValue);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java
new file mode 100644
index 0000000..bd06d74
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+import com.kylinolap.cube.invertedindex.TableRecord;
+import com.kylinolap.cube.invertedindex.TableRecordInfo;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author yangli9
+ *
+ */
+public class InvertedIndexPartitioner extends Partitioner<LongWritable, ImmutableBytesWritable> implements Configurable {
+
+ private Configuration conf;
+ private TableRecordInfo info;
+ private TableRecord rec;
+
+ @Override
+ public int getPartition(LongWritable key, ImmutableBytesWritable value, int numPartitions) {
+ rec.setBytes(value.get(), value.getOffset(), value.getLength());
+ return rec.getShard();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ try {
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+ CubeManager mgr = CubeManager.getInstance(config);
+ CubeInstance cube = mgr.getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
+ CubeSegment seg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), CubeSegmentStatusEnum.NEW);
+ this.info = new TableRecordInfo(seg);
+ this.rec = new TableRecord(this.info);
+ } catch (IOException e) {
+ throw new RuntimeException("", e);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java
new file mode 100644
index 0000000..09954a1
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+import com.kylinolap.cube.invertedindex.IIKeyValueCodec;
+import com.kylinolap.cube.invertedindex.TableRecord;
+import com.kylinolap.cube.invertedindex.TableRecordInfo;
+import com.kylinolap.cube.invertedindex.Slice;
+import com.kylinolap.cube.invertedindex.SliceBuilder;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexReducer extends Reducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
+
+ private TableRecordInfo info;
+ private TableRecord rec;
+ private SliceBuilder builder;
+ private IIKeyValueCodec kv;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+ CubeManager mgr = CubeManager.getInstance(config);
+ CubeInstance cube = mgr.getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
+ CubeSegment seg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), CubeSegmentStatusEnum.NEW);
+ info = new TableRecordInfo(seg);
+ rec = new TableRecord(info);
+ builder = null;
+ kv = new IIKeyValueCodec(info);
+ }
+
+ @Override
+ public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
+ throws IOException, InterruptedException {
+ for (ImmutableBytesWritable v : values) {
+ rec.setBytes(v.get(), v.getOffset(), v.getLength());
+
+ if (builder == null) {
+ builder = new SliceBuilder(info, rec.getShard());
+ }
+ System.out.println(rec.getShard() + " - " + rec);
+
+ Slice slice = builder.append(rec);
+ if (slice != null) {
+ output(slice, context);
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ Slice slice = builder.close();
+ if (slice != null) {
+ output(slice, context);
+ }
+ }
+
+ private void output(Slice slice, Context context) throws IOException, InterruptedException {
+ for (Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair : kv.encodeKeyValue(slice)) {
+ context.write(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java
new file mode 100644
index 0000000..096ae86
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author ysong1
+ *
+ */
+@SuppressWarnings("static-access")
+public class RandomKeyDistributionJob extends AbstractHadoopJob {
+
+ protected static final Logger log = LoggerFactory.getLogger(RandomKeyDistributionJob.class);
+
+ static final Option OPTION_KEY_CLASS = OptionBuilder.withArgName("keyclass").hasArg().isRequired(true).withDescription("Key Class").create("keyclass");
+ static final Option OPTION_REGION_MB = OptionBuilder.withArgName("regionmb").hasArg().isRequired(true).withDescription("MB per Region").create("regionmb");
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_KEY_CLASS);
+ options.addOption(OPTION_REGION_MB);
+
+ parseOptions(options, args);
+
+ // start job
+ String jobName = getOptionValue(OPTION_JOB_NAME);
+ job = Job.getInstance(getConf(), jobName);
+
+ job.setJarByClass(this.getClass());
+ addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ FileOutputFormat.setOutputPath(job, output);
+
+ String keyClass = getOptionValue(OPTION_KEY_CLASS);
+ Class<?> keyClz = Class.forName(keyClass);
+
+ int regionMB = Integer.parseInt(getOptionValue(OPTION_REGION_MB));
+
+ // Mapper
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(RandomKeyDistributionMapper.class);
+ job.setMapOutputKeyClass(keyClz);
+ job.setMapOutputValueClass(NullWritable.class);
+
+ // Reducer - only one
+ job.setReducerClass(RandomKeyDistributionReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(keyClz);
+ job.setOutputValueClass(NullWritable.class);
+ job.setNumReduceTasks(1);
+
+ this.deletePath(job.getConfiguration(), output);
+
+ // total map input MB
+ double totalMapInputMB = this.getTotalMapInputMB();
+ int regionCount = Math.max(1, (int) (totalMapInputMB / regionMB));
+ int mapSampleNumber = 1000;
+ System.out.println("Total Map Input MB: " + totalMapInputMB);
+ System.out.println("Region Count: " + regionCount);
+
+ // set job configuration
+ job.getConfiguration().set(BatchConstants.MAPPER_SAMPLE_NUMBER, String.valueOf(mapSampleNumber));
+ job.getConfiguration().set(BatchConstants.REGION_NUMBER, String.valueOf(regionCount));
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ printUsage(options);
+ log.error(e.getLocalizedMessage(), e);
+ return 2;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new RandomKeyDistributionJob(), args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
new file mode 100644
index 0000000..c434f69
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.kylinolap.common.util.RandomSampler;
+import com.kylinolap.job.constant.BatchConstants;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RandomKeyDistributionMapper<KEY extends Writable, VALUE> extends Mapper<KEY, VALUE, KEY, NullWritable> {
+
+ private Configuration conf;
+ private int sampleNumber;
+ private List<KEY> allKeys;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ conf = context.getConfiguration();
+ allKeys = new ArrayList<KEY>();
+ sampleNumber = Integer.parseInt(conf.get(BatchConstants.MAPPER_SAMPLE_NUMBER));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException {
+ KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf);
+ ReflectionUtils.copy(conf, key, keyCopy);
+ allKeys.add(keyCopy);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ RandomSampler<KEY> sampler = new RandomSampler<KEY>();
+ List<KEY> sampleResult = sampler.sample(allKeys, sampleNumber);
+ for (KEY k : sampleResult) {
+ context.write(k, NullWritable.get());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
new file mode 100644
index 0000000..f5475f2
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.kylinolap.job.constant.BatchConstants;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RandomKeyDistributionReducer<KEY extends Writable> extends Reducer<KEY, NullWritable, KEY, NullWritable> {
+
+ private Configuration conf;
+ private int regionNumber;
+ private List<KEY> allSplits;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ conf = context.getConfiguration();
+ allSplits = new ArrayList<KEY>();
+ regionNumber = Integer.parseInt(context.getConfiguration().get(BatchConstants.REGION_NUMBER));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void reduce(KEY key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
+ KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf);
+ ReflectionUtils.copy(conf, key, keyCopy);
+ allSplits.add(keyCopy);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ int stepLength = allSplits.size() / regionNumber;
+ for (int i = stepLength; i < allSplits.size(); i += stepLength) {
+ context.write(allSplits.get(i), NullWritable.get());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java b/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java
new file mode 100644
index 0000000..28237ca
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java
@@ -0,0 +1,418 @@
+package com.kylinolap.job.tools;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.persistence.JsonSerializer;
+import com.kylinolap.common.persistence.ResourceStore;
+import com.kylinolap.common.persistence.Serializer;
+import com.kylinolap.cube.*;
+import com.kylinolap.cube.project.ProjectInstance;
+import com.kylinolap.dict.DictionaryInfo;
+import com.kylinolap.dict.DictionaryManager;
+import com.kylinolap.dict.lookup.SnapshotManager;
+import com.kylinolap.dict.lookup.SnapshotTable;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+import com.kylinolap.metadata.model.schema.TableDesc;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by honma on 9/3/14.
+ * <p/>
+ * This tool serves for the purpose of migrating cubes. e.g. upgrade cube from
+ * dev env to test(prod) env, or vice versa.
+ * <p/>
+ * Note that different envs are assumed to share the same hadoop cluster,
+ * including hdfs, hbase and hive.
+ */
+public class CubeMigrationCLI {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCLI.class);
+
+ private static List<Opt> operations;
+ private static KylinConfig srcConfig;
+ private static KylinConfig dstConfig;
+ private static ResourceStore srcStore;
+ private static ResourceStore dstStore;
+ private static FileSystem hdfsFS;
+ private static HBaseAdmin hbaseAdmin;
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+
+ if (args.length != 6) {
+ usage();
+ System.exit(1);
+ }
+
+ moveCube(args[0], args[1], args[2], args[3], args[4], args[5]);
+ }
+
+ private static void usage() {
+ System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName overwriteIfExists realExecute");
+ System.out.println(
+ " srcKylinConfigUri: The KylinConfig of the cube’s source \n" +
+ "dstKylinConfigUri: The KylinConfig of the cube’s new home \n" +
+ "cubeName: the name of cube to be migrated. \n" +
+ "projectName: The target project in the target environment.(Make sure it exist) \n" +
+ "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" +
+ "realExecute: if false, just print the operations to take, if true, do the real migration. \n");
+
+ }
+
+ public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
+
+ srcConfig = srcCfg;
+ srcStore = ResourceStore.getStore(srcConfig);
+ dstConfig = dstCfg;
+ dstStore = ResourceStore.getStore(dstConfig);
+
+ CubeManager cubeManager = CubeManager.getInstance(srcConfig);
+ CubeInstance cube = cubeManager.getCube(cubeName);
+ logger.info("cube to be moved is : " + cubeName);
+
+ if (cube.getStatus() != CubeStatusEnum.READY)
+ throw new IllegalStateException("Cannot migrate cube that is not in READY state.");
+
+ for (CubeSegment segment : cube.getSegments()) {
+ if (segment.getStatus() != CubeSegmentStatusEnum.READY) {
+ throw new IllegalStateException("At least one segment is not in READY state");
+ }
+ }
+
+ checkAndGetHbaseUrl();
+
+ Configuration conf = HBaseConfiguration.create();
+ hbaseAdmin = new HBaseAdmin(conf);
+
+ hdfsFS = FileSystem.get(new Configuration());
+
+ operations = new ArrayList<Opt>();
+
+ copyFilesInMetaStore(cube, overwriteIfExists);
+ renameFoldersInHdfs(cube);
+ changeHtableHost(cube);
+ addCubeIntoProject(cubeName, projectName);
+
+ if (realExecute.equalsIgnoreCase("true")) {
+ doOpts();
+ } else {
+ showOpts();
+ }
+ }
+
+ public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
+
+ moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, overwriteIfExists, realExecute);
+ }
+
+ private static String checkAndGetHbaseUrl() {
+ String srcMetadataUrl = srcConfig.getMetadataUrl();
+ String dstMetadataUrl = dstConfig.getMetadataUrl();
+
+ logger.info("src metadata url is " + srcMetadataUrl);
+ logger.info("dst metadata url is " + dstMetadataUrl);
+
+ int srcIndex = srcMetadataUrl.toLowerCase().indexOf("hbase:");
+ int dstIndex = dstMetadataUrl.toLowerCase().indexOf("hbase:");
+ if (srcIndex < 0 || dstIndex < 0)
+ throw new IllegalStateException("Both metadata urls should be hbase metadata url");
+
+ String srcHbaseUrl = srcMetadataUrl.substring(srcIndex).trim();
+ String dstHbaseUrl = dstMetadataUrl.substring(dstIndex).trim();
+ if (!srcHbaseUrl.equalsIgnoreCase(dstHbaseUrl)) {
+ throw new IllegalStateException("hbase url not equal! ");
+ }
+
+ logger.info("hbase url is " + srcHbaseUrl.trim());
+ return srcHbaseUrl.trim();
+ }
+
+ private static void renameFoldersInHdfs(CubeInstance cube) {
+ for (CubeSegment segment : cube.getSegments()) {
+
+ String jobUuid = segment.getLastBuildJobID();
+ String src = JobInstance.getJobWorkingDir(jobUuid, srcConfig.getHdfsWorkingDirectory());
+ String tgt = JobInstance.getJobWorkingDir(jobUuid, dstConfig.getHdfsWorkingDirectory());
+
+ operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] { src, tgt }));
+ }
+
+ }
+
+ private static void changeHtableHost(CubeInstance cube) {
+ for (CubeSegment segment : cube.getSegments()) {
+ operations.add(new Opt(OptType.CHANGE_HTABLE_HOST,
+ new Object[] { segment.getStorageLocationIdentifier() }));
+ }
+ }
+
+ private static void copyFilesInMetaStore(CubeInstance cube, String overwriteIfExists) throws IOException {
+
+ List<String> metaItems = new ArrayList<String>();
+ List<String> dictAndSnapshot = new ArrayList<String>();
+ listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
+
+ if (dstStore.exists(cube.getResourcePath()) && !overwriteIfExists.equalsIgnoreCase("true"))
+ throw new IllegalStateException("The cube named " + cube.getName() + " already exists on target metadata store. Use overwriteIfExists to overwrite it");
+
+ for (String item : metaItems) {
+ operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item }));
+ }
+
+ for (String item : dictAndSnapshot) {
+ operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
+ }
+ }
+
+ private static void addCubeIntoProject(String cubeName, String projectName) throws IOException {
+ String projectResPath = ProjectInstance.concatResourcePath(projectName);
+ if (!dstStore.exists(projectResPath))
+ throw new IllegalStateException("The target project " + projectName + "does not exist");
+
+ operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { cubeName, projectName }));
+ }
+
+ private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, List<String> dictAndSnapshot) throws IOException {
+
+ CubeDesc cubeDesc = cube.getDescriptor();
+ metaResource.add(cube.getResourcePath());
+ metaResource.add(cubeDesc.getResourcePath());
+
+ for (TableDesc tableDesc : cubeDesc.listTables()) {
+ metaResource.add(tableDesc.getResourcePath());
+ }
+
+ for (CubeSegment segment : cube.getSegments()) {
+ dictAndSnapshot.addAll(segment.getSnapshotPaths());
+ dictAndSnapshot.addAll(segment.getDictionaryPaths());
+ }
+ }
+
+ private static enum OptType {
+ COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST
+ }
+
+ private static class Opt {
+ private OptType type;
+ private Object[] params;
+
+ private Opt(OptType type, Object[] params) {
+ this.type = type;
+ this.params = params;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(type).append(":");
+ for (Object s : params)
+ sb.append(s).append(", ");
+ return sb.toString();
+ }
+
+ }
+
+ private static void showOpts() {
+ for (int i = 0; i < operations.size(); ++i) {
+ showOpt(operations.get(i));
+ }
+ }
+
+ private static void showOpt(Opt opt) {
+ logger.info("Operation: " + opt.toString());
+ }
+
+ private static void doOpts() throws IOException, InterruptedException {
+ int index = 0;
+ try {
+ for (; index < operations.size(); ++index) {
+ logger.info("Operation index :" + index);
+ doOpt(operations.get(index));
+ }
+ } catch (Exception e) {
+ logger.error("error met", e);
+ logger.info("Try undoing previous changes");
+ // undo:
+ for (int i = index; i >= 0; --i) {
+ try {
+ undo(operations.get(i));
+ } catch (Exception ee) {
+ logger.error("error met ", e);
+ logger.info("Continue undoing...");
+ }
+ }
+
+ throw new RuntimeException("Cube moving failed");
+ }
+ }
+
+ private static void doOpt(Opt opt) throws IOException, InterruptedException {
+ logger.info("Executing operation: " + opt.toString());
+
+ switch (opt.type) {
+ case CHANGE_HTABLE_HOST: {
+ String tableName = (String) opt.params[0];
+ HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ hbaseAdmin.disableTable(tableName);
+ desc.setValue(CubeManager.getHtableMetadataKey(), dstConfig.getMetadataUrlPrefix());
+ hbaseAdmin.modifyTable(tableName, desc);
+ hbaseAdmin.enableTable(tableName);
+ logger.info("CHANGE_HTABLE_HOST is completed");
+ break;
+ }
+ case COPY_FILE_IN_META: {
+ String item = (String) opt.params[0];
+ InputStream inputStream = srcStore.getResource(item);
+ long ts = srcStore.getResourceTimestamp(item);
+ dstStore.putResource(item, inputStream, ts);
+ inputStream.close();
+ logger.info("Item " + item + " is copied");
+ break;
+ }
+ case COPY_DICT_OR_SNAPSHOT: {
+ String item = (String) opt.params[0];
+
+ if (item.toLowerCase().endsWith(".dict")) {
+ DictionaryManager dstDictMgr = DictionaryManager.getInstance(dstConfig);
+ DictionaryManager srcDicMgr = DictionaryManager.getInstance(srcConfig);
+ DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item);
+
+ long ts = dictSrc.getLastModified();
+ dictSrc.setLastModified(0);//to avoid resource store write conflict
+ DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictSrc.getDictionaryObject(), dictSrc);
+ dictSrc.setLastModified(ts);
+
+ if (dictSaved == dictSrc) {
+ //no dup found, already saved to dest
+ logger.info("Item " + item + " is copied");
+ } else {
+ //dictSrc is rejected because of duplication
+ //modify cube's dictionary path
+ String cubeName = (String) opt.params[1];
+ String cubeResPath = CubeInstance.concatResourcePath(cubeName);
+ Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
+ CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer);
+ for (CubeSegment segment : cube.getSegments()) {
+ for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) {
+ if (entry.getValue().equalsIgnoreCase(item)) {
+ entry.setValue(dictSaved.getResourcePath());
+ }
+ }
+ }
+ dstStore.putResource(cubeResPath, cube, cubeSerializer);
+ logger.info("Item " + item + " is dup, instead " + dictSaved.getResourcePath() + " is reused");
+ }
+
+ } else if (item.toLowerCase().endsWith(".snapshot")) {
+ SnapshotManager dstSnapMgr = SnapshotManager.getInstance(dstConfig);
+ SnapshotManager srcSnapMgr = SnapshotManager.getInstance(srcConfig);
+ SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item);
+
+ long ts = snapSrc.getLastModified();
+ snapSrc.setLastModified(0);
+ SnapshotTable snapSaved = dstSnapMgr.trySaveNewSnapshot(snapSrc);
+ snapSrc.setLastModified(ts);
+
+
+ if (snapSaved == snapSrc) {
+ //no dup found, already saved to dest
+ logger.info("Item " + item + " is copied");
+
+ } else {
+ String cubeName = (String) opt.params[1];
+ String cubeResPath = CubeInstance.concatResourcePath(cubeName);
+ Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
+ CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer);
+ for (CubeSegment segment : cube.getSegments()) {
+ for (Map.Entry<String, String> entry : segment.getSnapshots().entrySet()) {
+ if (entry.getValue().equalsIgnoreCase(item)) {
+ entry.setValue(snapSaved.getResourcePath());
+ }
+ }
+ }
+ dstStore.putResource(cubeResPath, cube, cubeSerializer);
+ logger.info("Item " + item + " is dup, instead " + snapSaved.getResourcePath() + " is reused");
+
+ }
+
+ } else {
+ logger.error("unknown item found: " + item);
+ logger.info("ignore it");
+ }
+
+ break;
+ }
+ case RENAME_FOLDER_IN_HDFS: {
+ String srcPath = (String) opt.params[0];
+ String dstPath = (String) opt.params[1];
+ hdfsFS.rename(new Path(srcPath), new Path(dstPath));
+ logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
+ break;
+ }
+ case ADD_INTO_PROJECT: {
+ String cubeName = (String) opt.params[0];
+ String projectName = (String) opt.params[1];
+ String projectResPath = ProjectInstance.concatResourcePath(projectName);
+ Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
+ ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer);
+ project.removeCube(cubeName);
+ project.addCube(cubeName);
+ dstStore.putResource(projectResPath, project, projectSerializer);
+ logger.info("Project instance for " + projectName + " is corrected");
+ break;
+ }
+ }
+ }
+
+ private static void undo(Opt opt) throws IOException, InterruptedException {
+ logger.info("Undo operation: " + opt.toString());
+
+ switch (opt.type) {
+ case CHANGE_HTABLE_HOST: {
+ String tableName = (String) opt.params[0];
+ HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ hbaseAdmin.disableTable(tableName);
+ desc.setValue(CubeManager.getHtableMetadataKey(), srcConfig.getMetadataUrlPrefix());
+ hbaseAdmin.modifyTable(tableName, desc);
+ hbaseAdmin.enableTable(tableName);
+ break;
+ }
+ case COPY_FILE_IN_META: {
+ // no harm
+ logger.info("Undo for COPY_FILE_IN_META is ignored");
+ break;
+ }
+ case COPY_DICT_OR_SNAPSHOT: {
+ // no harm
+ logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
+ break;
+ }
+ case RENAME_FOLDER_IN_HDFS: {
+ String srcPath = (String) opt.params[1];
+ String dstPath = (String) opt.params[0];
+
+ if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new Path(dstPath))) {
+ hdfsFS.rename(new Path(srcPath), new Path(dstPath));
+ logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
+ }
+ break;
+ }
+ case ADD_INTO_PROJECT: {
+ logger.info("Undo for ADD_INTO_PROJECT is ignored");
+ break;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java b/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java
new file mode 100644
index 0000000..8c81e73
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.tools;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+
+import org.apache.commons.httpclient.ConnectTimeoutException;
+import org.apache.commons.httpclient.HttpClientError;
+import org.apache.commons.httpclient.params.HttpConnectionParams;
+import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory;
+import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xduo
+ *
+ */
+public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFactory {
+ /** Log object for this class. */
+ private static Logger LOG = LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class);
+ private SSLContext sslcontext = null;
+
+ /**
+ * Constructor for DefaultSslProtocolSocketFactory.
+ */
+ public DefaultSslProtocolSocketFactory() {
+ super();
+ }
+
+ /**
+ * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int)
+ */
+ public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException {
+ return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort);
+ }
+
+ /**
+ * Attempts to get a new socket connection to the given host within the
+ * given time limit.
+ *
+ * <p>
+ * To circumvent the limitations of older JREs that do not support connect
+ * timeout a controller thread is executed. The controller thread attempts
+ * to create a new socket within the given limit of time. If socket
+ * constructor does not return until the timeout expires, the controller
+ * terminates and throws an {@link ConnectTimeoutException}
+ * </p>
+ *
+ * @param host
+ * the host name/IP
+ * @param port
+ * the port on the host
+ * @param localAddress
+ * the local host name/IP to bind the socket to
+ * @param localPort
+ * the port on the local machine
+ * @param params
+ * {@link HttpConnectionParams Http connection parameters}
+ *
+ * @return Socket a new socket
+ *
+ * @throws IOException
+ * if an I/O error occurs while creating the socket
+ * @throws UnknownHostException
+ * if the IP address of the host cannot be determined
+ * @throws ConnectTimeoutException
+ * DOCUMENT ME!
+ * @throws IllegalArgumentException
+ * DOCUMENT ME!
+ */
+ public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException {
+ if (params == null) {
+ throw new IllegalArgumentException("Parameters may not be null");
+ }
+
+ int timeout = params.getConnectionTimeout();
+
+ if (timeout == 0) {
+ return createSocket(host, port, localAddress, localPort);
+ } else {
+ // To be eventually deprecated when migrated to Java 1.4 or above
+ return ControllerThreadSocketFactory.createSocket(this, host, port, localAddress, localPort, timeout);
+ }
+ }
+
+ /**
+ * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int)
+ */
+ public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+ return getSSLContext().getSocketFactory().createSocket(host, port);
+ }
+
+ /**
+ * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean)
+ */
+ public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException {
+ return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose);
+ }
+
+ public boolean equals(Object obj) {
+ return ((obj != null) && obj.getClass().equals(DefaultX509TrustManager.class));
+ }
+
+ public int hashCode() {
+ return DefaultX509TrustManager.class.hashCode();
+ }
+
+ private static SSLContext createEasySSLContext() {
+ try {
+ SSLContext context = SSLContext.getInstance("TLS");
+ context.init(null, new TrustManager[] { new DefaultX509TrustManager(null) }, null);
+
+ return context;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new HttpClientError(e.toString());
+ }
+ }
+
+ private SSLContext getSSLContext() {
+ if (this.sslcontext == null) {
+ this.sslcontext = createEasySSLContext();
+ }
+
+ return this.sslcontext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java b/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java
new file mode 100644
index 0000000..8dedaa6
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.tools;
+
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xduo
+ *
+ */
+public class DefaultX509TrustManager implements X509TrustManager {
+
+ /** Log object for this class. */
+ private static Logger LOG = LoggerFactory.getLogger(DefaultX509TrustManager.class);
+ private X509TrustManager standardTrustManager = null;
+
+ /**
+ * Constructor for DefaultX509TrustManager.
+ *
+ */
+ public DefaultX509TrustManager(KeyStore keystore) throws NoSuchAlgorithmException, KeyStoreException {
+ super();
+
+ TrustManagerFactory factory = TrustManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ factory.init(keystore);
+
+ TrustManager[] trustmanagers = factory.getTrustManagers();
+
+ if (trustmanagers.length == 0) {
+ throw new NoSuchAlgorithmException("SunX509 trust manager not supported");
+ }
+
+ this.standardTrustManager = (X509TrustManager) trustmanagers[0];
+ }
+
+ public X509Certificate[] getAcceptedIssuers() {
+ return this.standardTrustManager.getAcceptedIssuers();
+ }
+
+ public boolean isClientTrusted(X509Certificate[] certificates) {
+ return true;
+ // return this.standardTrustManager.isClientTrusted(certificates);
+ }
+
+ public boolean isServerTrusted(X509Certificate[] certificates) {
+ if ((certificates != null) && LOG.isDebugEnabled()) {
+ LOG.debug("Server certificate chain:");
+
+ for (int i = 0; i < certificates.length; i++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("X509Certificate[" + i + "]=" + certificates[i]);
+ }
+ }
+ }
+
+ if ((certificates != null) && (certificates.length == 1)) {
+ X509Certificate certificate = certificates[0];
+
+ try {
+ certificate.checkValidity();
+ } catch (CertificateException e) {
+ LOG.error(e.toString());
+
+ return false;
+ }
+
+ return true;
+ } else {
+ return true;
+ // return this.standardTrustManager.isServerTrusted(certificates);
+ }
+ }
+
+ @Override
+ public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ // TODO Auto-generated method stub
+
+ }
+
+}