You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by lu...@apache.org on 2015/01/07 15:46:39 UTC

[18/51] [partial] incubator-kylin git commit: migrate repo from github.com to apache git

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java
new file mode 100644
index 0000000..022963d
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.HadoopUtil;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc;
+
+/**
+ * @author ysong1
+ * 
+ */
+public class IIBulkLoadJob extends AbstractHadoopJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_HTABLE_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            parseOptions(options, args);
+
+            String tableName = getOptionValue(OPTION_HTABLE_NAME);
+            String input = getOptionValue(OPTION_INPUT_PATH);
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
+
+            FileSystem fs = FileSystem.get(getConf());
+            FsPermission permission = new FsPermission((short) 0777);
+            fs.setPermission(new Path(input, InvertedIndexDesc.HBASE_FAMILY), permission);
+
+            int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName });
+
+            CubeManager mgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = mgr.getCube(cubeName);
+            CubeSegment seg = cube.getFirstSegment();
+            seg.setStorageLocationIdentifier(tableName);
+            seg.setStatus(CubeSegmentStatusEnum.READY);
+            mgr.updateCube(cube);
+
+            return hbaseExitCode;
+
+        } catch (Exception e) {
+            printUsage(options);
+            e.printStackTrace(System.err);
+            return 2;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        IIBulkLoadJob job = new IIBulkLoadJob();
+        job.setConf(HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl()));
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java
new file mode 100644
index 0000000..0c1afd0
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.File;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.HadoopUtil;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class IICreateHFileJob extends AbstractHadoopJob {
+
+    protected static final Logger log = LoggerFactory.getLogger(IICreateHFileJob.class);
+
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_HTABLE_NAME);
+            parseOptions(options, args);
+
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+
+            File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+            if (JarFile.exists()) {
+                job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+            } else {
+                job.setJarByClass(this.getClass());
+            }
+
+            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+            FileOutputFormat.setOutputPath(job, output);
+
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+            job.setMapperClass(IICreateHFileMapper.class);
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+            job.setMapOutputValueClass(KeyValue.class);
+
+            String tableName = getOptionValue(OPTION_HTABLE_NAME);
+            HTable htable = new HTable(getConf(), tableName);
+            HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            printUsage(options);
+            log.error(e.getLocalizedMessage(), e);
+            return 2;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        IICreateHFileJob job = new IICreateHFileJob();
+        job.setConf(HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl()));
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java
new file mode 100644
index 0000000..2ceaa1c
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import static com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class IICreateHFileMapper extends Mapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
+
+    long timestamp;
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+        timestamp = System.currentTimeMillis();
+    }
+
+    @Override
+    protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
+
+        KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
+                HBASE_FAMILY_BYTES, 0, HBASE_FAMILY_BYTES.length, //
+                HBASE_QUALIFIER_BYTES, 0, HBASE_QUALIFIER_BYTES.length, //
+                timestamp, Type.Put, //
+                value.get(), value.getOffset(), value.getLength());
+
+        context.write(key, kv);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java
new file mode 100644
index 0000000..04fd274
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.BytesUtil;
+import com.kylinolap.common.util.HadoopUtil;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.invertedindex.IIKeyValueCodec;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class IICreateHTableJob extends AbstractHadoopJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_HTABLE_NAME);
+            parseOptions(options, args);
+
+            String tableName = getOptionValue(OPTION_HTABLE_NAME);
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
+
+            CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+            int sharding = cubeInstance.getInvertedIndexDesc().getSharding();
+
+            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+            HColumnDescriptor cf = new HColumnDescriptor(InvertedIndexDesc.HBASE_FAMILY);
+            cf.setMaxVersions(1);
+            cf.setCompressionType(Algorithm.LZO);
+            cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+            tableDesc.addFamily(cf);
+
+            Configuration conf = HBaseConfiguration.create(getConf());
+            if (User.isHBaseSecurityEnabled(conf)) {
+                // add coprocessor for bulk load
+                tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+            }
+
+            // drop the table first
+            HBaseAdmin admin = new HBaseAdmin(conf);
+            if (admin.tableExists(tableName)) {
+                admin.disableTable(tableName);
+                admin.deleteTable(tableName);
+            }
+
+            // create table
+            byte[][] splitKeys = getSplits(sharding);
+            if (splitKeys.length == 0)
+                splitKeys = null;
+            admin.createTable(tableDesc, splitKeys);
+            if (splitKeys != null) {
+                for (int i = 0; i < splitKeys.length; i++) {
+                    System.out.println("split key " + i + ": " + BytesUtil.toHex(splitKeys[i]));
+                }
+            }
+            System.out.println("create hbase table " + tableName + " done.");
+            admin.close();
+
+            return 0;
+        } catch (Exception e) {
+            printUsage(options);
+            e.printStackTrace(System.err);
+            log.error(e.getLocalizedMessage(), e);
+            return 2;
+        }
+    }
+
+    //one region for one shard
+    private byte[][] getSplits(int shard) {
+        byte[][] result = new byte[shard - 1][];
+        for (int i = 1; i < shard; ++i) {
+            byte[] split = new byte[IIKeyValueCodec.SHARD_LEN];
+            BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN);
+            result[i - 1] = split;
+        }
+        return result;
+    }
+
+    public static void main(String[] args) throws Exception {
+        IICreateHTableJob job = new IICreateHTableJob();
+        job.setConf(HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl()));
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
new file mode 100644
index 0000000..83219d2
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import com.kylinolap.common.util.ByteArray;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsCombiner extends Reducer<ShortWritable, Text, ShortWritable, Text> {
+
+    private Text outputValue = new Text();
+
+    @Override
+    protected void setup(Context context) throws IOException {
+    }
+
+    @Override
+    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+        HashSet<ByteArray> set = new HashSet<ByteArray>();
+        for (Text textValue : values) {
+            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+            set.add(value);
+        }
+
+        for (ByteArray value : set) {
+            outputValue.set(value.data);
+            context.write(key, outputValue);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java
new file mode 100644
index 0000000..bc12db2
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.metadata.MetadataManager;
+import com.kylinolap.metadata.model.schema.ColumnDesc;
+import com.kylinolap.metadata.model.schema.TableDesc;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsJob extends AbstractHadoopJob {
+    protected static final Logger log = LoggerFactory.getLogger(IIDistinctColumnsJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_TABLE_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_INPUT_FORMAT);
+            options.addOption(OPTION_INPUT_DELIM);
+            options.addOption(OPTION_OUTPUT_PATH);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase();
+            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+            String inputFormat = getOptionValue(OPTION_INPUT_FORMAT);
+            String inputDelim = getOptionValue(OPTION_INPUT_DELIM);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            // ----------------------------------------------------------------------------
+
+            System.out.println("Starting: " + job.getJobName());
+
+            setupMapInput(input, inputFormat, inputDelim);
+            setupReduceOutput(output);
+
+            // pass table and columns
+            MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+            TableDesc table = metaMgr.getTableDesc(tableName);
+            job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
+            job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(table));
+
+            return waitForCompletion(job);
+
+        } catch (Exception e) {
+            printUsage(options);
+            log.error(e.getLocalizedMessage(), e);
+            return 2;
+        }
+
+    }
+
+    private String getColumns(TableDesc table) {
+        StringBuilder buf = new StringBuilder();
+        for (ColumnDesc col : table.getColumns()) {
+            if (buf.length() > 0)
+                buf.append(",");
+            buf.append(col.getName());
+        }
+        return buf.toString();
+    }
+
+    private void setupMapInput(Path input, String inputFormat, String inputDelim) throws IOException {
+        FileInputFormat.setInputPaths(job, input);
+
+        File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+        if (JarFile.exists()) {
+            job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+        } else {
+            job.setJarByClass(this.getClass());
+        }
+
+        if ("textinputformat".equalsIgnoreCase(inputFormat) || "text".equalsIgnoreCase(inputFormat)) {
+            job.setInputFormatClass(TextInputFormat.class);
+        } else {
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+        }
+
+        if ("t".equals(inputDelim)) {
+            inputDelim = "\t";
+        } else if ("177".equals(inputDelim)) {
+            inputDelim = "\177";
+        }
+        if (inputDelim != null) {
+            job.getConfiguration().set(BatchConstants.INPUT_DELIM, inputDelim);
+        }
+
+        job.setMapperClass(IIDistinctColumnsMapper.class);
+        job.setCombinerClass(IIDistinctColumnsCombiner.class);
+        job.setMapOutputKeyClass(ShortWritable.class);
+        job.setMapOutputValueClass(Text.class);
+    }
+
+    private void setupReduceOutput(Path output) throws IOException {
+        job.setReducerClass(IIDistinctColumnsReducer.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(Text.class);
+
+        FileOutputFormat.setOutputPath(job, output);
+        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+        job.setNumReduceTasks(1);
+
+        deletePath(job.getConfiguration(), output);
+    }
+
+    public static void main(String[] args) throws Exception {
+        IIDistinctColumnsJob job = new IIDistinctColumnsJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
new file mode 100644
index 0000000..b7456bf
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import com.kylinolap.cube.common.BytesSplitter;
+import com.kylinolap.cube.common.SplittedBytes;
+import com.kylinolap.job.constant.BatchConstants;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsMapper<KEYIN> extends Mapper<KEYIN, Text, ShortWritable, Text> {
+
+    private String[] columns;
+    private int delim;
+    private BytesSplitter splitter;
+
+    private ShortWritable outputKey = new ShortWritable();
+    private Text outputValue = new Text();
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
+        String inputDelim = conf.get(BatchConstants.INPUT_DELIM);
+        this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0);
+        this.splitter = new BytesSplitter(200, 4096);
+    }
+
+    @Override
+    public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
+        if (delim == -1) {
+            delim = splitter.detectDelim(value, columns.length);
+        }
+
+        int nParts = splitter.split(value.getBytes(), value.getLength(), (byte) delim);
+        SplittedBytes[] parts = splitter.getSplitBuffers();
+
+        if (nParts != columns.length) {
+            throw new RuntimeException("Got " + parts.length + " from -- " + value.toString() + " -- but only " + columns.length + " expected");
+        }
+
+        for (short i = 0; i < nParts; i++) {
+            outputKey.set(i);
+            outputValue.set(parts[i].value, 0, parts[i].length);
+            context.write(outputKey, outputValue);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
new file mode 100644
index 0000000..f170057
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import com.kylinolap.common.util.ByteArray;
+import com.kylinolap.job.constant.BatchConstants;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsReducer extends Reducer<ShortWritable, Text, NullWritable, Text> {
+
+    private String[] columns;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
+    }
+
+    @Override
+    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+        String columnName = columns[key.get()];
+
+        HashSet<ByteArray> set = new HashSet<ByteArray>();
+        for (Text textValue : values) {
+            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+            set.add(value);
+        }
+
+        Configuration conf = context.getConfiguration();
+        FileSystem fs = FileSystem.get(conf);
+        String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+        FSDataOutputStream out = fs.create(new Path(outputPath, columnName));
+
+        try {
+            for (ByteArray value : set) {
+                out.write(value.data);
+                out.write('\n');
+            }
+        } finally {
+            out.close();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java
new file mode 100644
index 0000000..6681db6
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexJob extends AbstractHadoopJob {
+    protected static final Logger log = LoggerFactory.getLogger(InvertedIndexJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_INPUT_FORMAT);
+            options.addOption(OPTION_INPUT_DELIM);
+            options.addOption(OPTION_OUTPUT_PATH);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+            String inputFormat = getOptionValue(OPTION_INPUT_FORMAT);
+            String inputDelim = getOptionValue(OPTION_INPUT_DELIM);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            // ----------------------------------------------------------------------------
+
+            System.out.println("Starting: " + job.getJobName());
+            
+            CubeInstance cube = getCube(cubeName);
+
+            setupMapInput(input, inputFormat, inputDelim);
+            setupReduceOutput(output, cube.getInvertedIndexDesc().getSharding());
+            attachMetadata(cube);
+
+            return waitForCompletion(job);
+
+        } catch (Exception e) {
+            printUsage(options);
+            log.error(e.getLocalizedMessage(), e);
+            return 2;
+        }
+
+    }
+
+    /**
+     * @param cubeName
+     * @return
+     */
+    private CubeInstance getCube(String cubeName) {
+        CubeManager mgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+        CubeInstance cube = mgr.getCube(cubeName);
+        if (cube == null)
+            throw new IllegalArgumentException("No Inverted Index Cubefound by name " + cubeName);
+        return cube;
+    }
+
+    private void attachMetadata(CubeInstance cube) throws IOException {
+
+        Configuration conf = job.getConfiguration();
+        attachKylinPropsAndMetadata(cube, conf);
+
+        CubeSegment seg = cube.getFirstSegment();
+        conf.set(BatchConstants.CFG_CUBE_NAME, cube.getName());
+        conf.set(BatchConstants.CFG_CUBE_SEGMENT_NAME, seg.getName());
+    }
+
+    private void setupMapInput(Path input, String inputFormat, String inputDelim) throws IOException {
+        FileInputFormat.setInputPaths(job, input);
+
+        File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+        if (JarFile.exists()) {
+            job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+        } else {
+            job.setJarByClass(this.getClass());
+        }
+
+        if ("textinputformat".equalsIgnoreCase(inputFormat) || "text".equalsIgnoreCase(inputFormat)) {
+            job.setInputFormatClass(TextInputFormat.class);
+        } else {
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+        }
+
+        if ("t".equals(inputDelim)) {
+            inputDelim = "\t";
+        } else if ("177".equals(inputDelim)) {
+            inputDelim = "\177";
+        }
+        if (inputDelim != null) {
+            job.getConfiguration().set(BatchConstants.INPUT_DELIM, inputDelim);
+        }
+
+        job.setMapperClass(InvertedIndexMapper.class);
+        job.setMapOutputKeyClass(LongWritable.class);
+        job.setMapOutputValueClass(ImmutableBytesWritable.class);
+        job.setPartitionerClass(InvertedIndexPartitioner.class);
+    }
+
+    private void setupReduceOutput(Path output, short sharding) throws IOException {
+        job.setReducerClass(InvertedIndexReducer.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(ImmutableBytesWritable.class);
+        
+        job.setNumReduceTasks(sharding);
+
+        FileOutputFormat.setOutputPath(job, output);
+
+        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+        deletePath(job.getConfiguration(), output);
+    }
+
+    public static void main(String[] args) throws Exception {
+        InvertedIndexJob job = new InvertedIndexJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java
new file mode 100644
index 0000000..f555c40
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+import com.kylinolap.cube.common.BytesSplitter;
+import com.kylinolap.cube.common.SplittedBytes;
+import com.kylinolap.cube.invertedindex.TableRecord;
+import com.kylinolap.cube.invertedindex.TableRecordInfo;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class InvertedIndexMapper<KEYIN> extends Mapper<KEYIN, Text, LongWritable, ImmutableBytesWritable> {
+
+    private TableRecordInfo info;
+    private TableRecord rec;
+    private int delim;
+    private BytesSplitter splitter;
+
+    private LongWritable outputKey;
+    private ImmutableBytesWritable outputValue;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        String inputDelim = conf.get(BatchConstants.INPUT_DELIM);
+        this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0);
+        this.splitter = new BytesSplitter(200, 4096);
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+        CubeManager mgr = CubeManager.getInstance(config);
+        CubeInstance cube = mgr.getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
+        CubeSegment seg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), CubeSegmentStatusEnum.NEW);
+        this.info = new TableRecordInfo(seg);
+        this.rec = new TableRecord(this.info);
+
+        outputKey = new LongWritable();
+        outputValue = new ImmutableBytesWritable(rec.getBytes());
+    }
+
+    @Override
+    public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
+        if (delim == -1) {
+            delim = splitter.detectDelim(value, info.getColumnCount());
+        }
+
+        int nParts = splitter.split(value.getBytes(), value.getLength(), (byte) delim);
+        SplittedBytes[] parts = splitter.getSplitBuffers();
+
+        if (nParts != info.getColumnCount()) {
+            throw new RuntimeException("Got " + parts.length + " from -- " + value.toString() + " -- but only " + info.getColumnCount() + " expected");
+        }
+
+        rec.reset();
+        for (int i = 0; i < nParts; i++) {
+            rec.setValueString(i, Bytes.toString(parts[i].value, 0, parts[i].length));
+        }
+
+        outputKey.set(rec.getTimestamp());
+        // outputValue's backing bytes array is the same as rec
+
+        context.write(outputKey, outputValue);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java
new file mode 100644
index 0000000..bd06d74
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+import com.kylinolap.cube.invertedindex.TableRecord;
+import com.kylinolap.cube.invertedindex.TableRecordInfo;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class InvertedIndexPartitioner extends Partitioner<LongWritable, ImmutableBytesWritable> implements Configurable {
+
+    private Configuration conf;
+    private TableRecordInfo info;
+    private TableRecord rec;
+
+    @Override
+    public int getPartition(LongWritable key, ImmutableBytesWritable value, int numPartitions) {
+        rec.setBytes(value.get(), value.getOffset(), value.getLength());
+        return rec.getShard();
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+        try {
+            KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+            CubeManager mgr = CubeManager.getInstance(config);
+            CubeInstance cube = mgr.getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
+            CubeSegment seg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), CubeSegmentStatusEnum.NEW);
+            this.info = new TableRecordInfo(seg);
+            this.rec = new TableRecord(this.info);
+        } catch (IOException e) {
+            throw new RuntimeException("", e);
+        }
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java
new file mode 100644
index 0000000..09954a1
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+import com.kylinolap.cube.invertedindex.IIKeyValueCodec;
+import com.kylinolap.cube.invertedindex.TableRecord;
+import com.kylinolap.cube.invertedindex.TableRecordInfo;
+import com.kylinolap.cube.invertedindex.Slice;
+import com.kylinolap.cube.invertedindex.SliceBuilder;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexReducer extends Reducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
+
+    private TableRecordInfo info;
+    private TableRecord rec;
+    private SliceBuilder builder;
+    private IIKeyValueCodec kv;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+        CubeManager mgr = CubeManager.getInstance(config);
+        CubeInstance cube = mgr.getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
+        CubeSegment seg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), CubeSegmentStatusEnum.NEW);
+        info = new TableRecordInfo(seg);
+        rec = new TableRecord(info);
+        builder = null;
+        kv = new IIKeyValueCodec(info);
+    }
+
+    @Override
+    public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
+            throws IOException, InterruptedException {
+        for (ImmutableBytesWritable v : values) {
+            rec.setBytes(v.get(), v.getOffset(), v.getLength());
+
+            if (builder == null) {
+                builder = new SliceBuilder(info, rec.getShard());
+            }
+            System.out.println(rec.getShard() + " - " + rec);
+
+            Slice slice = builder.append(rec);
+            if (slice != null) {
+                output(slice, context);
+            }
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        Slice slice = builder.close();
+        if (slice != null) {
+            output(slice, context);
+        }
+    }
+
+    private void output(Slice slice, Context context) throws IOException, InterruptedException {
+        for (Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair : kv.encodeKeyValue(slice)) {
+            context.write(pair.getFirst(), pair.getSecond());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java
new file mode 100644
index 0000000..096ae86
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author ysong1
+ * 
+ */
+@SuppressWarnings("static-access")
+public class RandomKeyDistributionJob extends AbstractHadoopJob {
+
+    protected static final Logger log = LoggerFactory.getLogger(RandomKeyDistributionJob.class);
+
+    static final Option OPTION_KEY_CLASS = OptionBuilder.withArgName("keyclass").hasArg().isRequired(true).withDescription("Key Class").create("keyclass");
+    static final Option OPTION_REGION_MB = OptionBuilder.withArgName("regionmb").hasArg().isRequired(true).withDescription("MB per Region").create("regionmb");
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_KEY_CLASS);
+            options.addOption(OPTION_REGION_MB);
+
+            parseOptions(options, args);
+
+            // start job
+            String jobName = getOptionValue(OPTION_JOB_NAME);
+            job = Job.getInstance(getConf(), jobName);
+
+            job.setJarByClass(this.getClass());
+            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            FileOutputFormat.setOutputPath(job, output);
+
+            String keyClass = getOptionValue(OPTION_KEY_CLASS);
+            Class<?> keyClz = Class.forName(keyClass);
+
+            int regionMB = Integer.parseInt(getOptionValue(OPTION_REGION_MB));
+
+            // Mapper
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+            job.setMapperClass(RandomKeyDistributionMapper.class);
+            job.setMapOutputKeyClass(keyClz);
+            job.setMapOutputValueClass(NullWritable.class);
+
+            // Reducer - only one
+            job.setReducerClass(RandomKeyDistributionReducer.class);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            job.setOutputKeyClass(keyClz);
+            job.setOutputValueClass(NullWritable.class);
+            job.setNumReduceTasks(1);
+
+            this.deletePath(job.getConfiguration(), output);
+
+            // total map input MB
+            double totalMapInputMB = this.getTotalMapInputMB();
+            int regionCount = Math.max(1, (int) (totalMapInputMB / regionMB));
+            int mapSampleNumber = 1000;
+            System.out.println("Total Map Input MB: " + totalMapInputMB);
+            System.out.println("Region Count: " + regionCount);
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.MAPPER_SAMPLE_NUMBER, String.valueOf(mapSampleNumber));
+            job.getConfiguration().set(BatchConstants.REGION_NUMBER, String.valueOf(regionCount));
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            printUsage(options);
+            log.error(e.getLocalizedMessage(), e);
+            return 2;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new RandomKeyDistributionJob(), args);
+        System.exit(exitCode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
new file mode 100644
index 0000000..c434f69
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.kylinolap.common.util.RandomSampler;
+import com.kylinolap.job.constant.BatchConstants;
+
+/**
+ * @author ysong1
+ * 
+ */
+public class RandomKeyDistributionMapper<KEY extends Writable, VALUE> extends Mapper<KEY, VALUE, KEY, NullWritable> {
+
+    private Configuration conf;
+    private int sampleNumber;
+    private List<KEY> allKeys;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        conf = context.getConfiguration();
+        allKeys = new ArrayList<KEY>();
+        sampleNumber = Integer.parseInt(conf.get(BatchConstants.MAPPER_SAMPLE_NUMBER));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException {
+        KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf);
+        ReflectionUtils.copy(conf, key, keyCopy);
+        allKeys.add(keyCopy);
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        RandomSampler<KEY> sampler = new RandomSampler<KEY>();
+        List<KEY> sampleResult = sampler.sample(allKeys, sampleNumber);
+        for (KEY k : sampleResult) {
+            context.write(k, NullWritable.get());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
new file mode 100644
index 0000000..f5475f2
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.kylinolap.job.constant.BatchConstants;
+
+/**
+ * @author ysong1
+ * 
+ */
+public class RandomKeyDistributionReducer<KEY extends Writable> extends Reducer<KEY, NullWritable, KEY, NullWritable> {
+
+    private Configuration conf;
+    private int regionNumber;
+    private List<KEY> allSplits;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        conf = context.getConfiguration();
+        allSplits = new ArrayList<KEY>();
+        regionNumber = Integer.parseInt(context.getConfiguration().get(BatchConstants.REGION_NUMBER));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void reduce(KEY key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
+        KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf);
+        ReflectionUtils.copy(conf, key, keyCopy);
+        allSplits.add(keyCopy);
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        int stepLength = allSplits.size() / regionNumber;
+        for (int i = stepLength; i < allSplits.size(); i += stepLength) {
+            context.write(allSplits.get(i), NullWritable.get());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java b/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java
new file mode 100644
index 0000000..28237ca
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java
@@ -0,0 +1,418 @@
+package com.kylinolap.job.tools;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.persistence.JsonSerializer;
+import com.kylinolap.common.persistence.ResourceStore;
+import com.kylinolap.common.persistence.Serializer;
+import com.kylinolap.cube.*;
+import com.kylinolap.cube.project.ProjectInstance;
+import com.kylinolap.dict.DictionaryInfo;
+import com.kylinolap.dict.DictionaryManager;
+import com.kylinolap.dict.lookup.SnapshotManager;
+import com.kylinolap.dict.lookup.SnapshotTable;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+import com.kylinolap.metadata.model.schema.TableDesc;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by honma on 9/3/14.
+ * <p/>
+ * This tool serves for the purpose of migrating cubes. e.g. upgrade cube from
+ * dev env to test(prod) env, or vice versa.
+ * <p/>
+ * Note that different envs are assumed to share the same hadoop cluster,
+ * including hdfs, hbase and hive.
+ */
+public class CubeMigrationCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCLI.class);
+
+    private static List<Opt> operations;
+    private static KylinConfig srcConfig;
+    private static KylinConfig dstConfig;
+    private static ResourceStore srcStore;
+    private static ResourceStore dstStore;
+    private static FileSystem hdfsFS;
+    private static HBaseAdmin hbaseAdmin;
+
+    public static void main(String[] args) throws IOException, InterruptedException {
+
+        if (args.length != 6) {
+            usage();
+            System.exit(1);
+        }
+
+        moveCube(args[0], args[1], args[2], args[3], args[4], args[5]);
+    }
+
+    private static void usage() {
+        System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName overwriteIfExists realExecute");
+        System.out.println(
+                " srcKylinConfigUri: The KylinConfig of the cube’s source \n" +
+                        "dstKylinConfigUri: The KylinConfig of the cube’s new home \n" +
+                        "cubeName: the name of cube to be migrated. \n" +
+                        "projectName: The target project in the target environment.(Make sure it exist) \n" +
+                        "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" +
+                        "realExecute: if false, just print the operations to take, if true, do the real migration. \n");
+
+    }
+
+    public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
+
+        srcConfig = srcCfg;
+        srcStore = ResourceStore.getStore(srcConfig);
+        dstConfig = dstCfg;
+        dstStore = ResourceStore.getStore(dstConfig);
+
+        CubeManager cubeManager = CubeManager.getInstance(srcConfig);
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        logger.info("cube to be moved is : " + cubeName);
+
+        if (cube.getStatus() != CubeStatusEnum.READY)
+            throw new IllegalStateException("Cannot migrate cube that is not in READY state.");
+
+        for (CubeSegment segment : cube.getSegments()) {
+            if (segment.getStatus() != CubeSegmentStatusEnum.READY) {
+                throw new IllegalStateException("At least one segment is not in READY state");
+            }
+        }
+
+        checkAndGetHbaseUrl();
+
+        Configuration conf = HBaseConfiguration.create();
+        hbaseAdmin = new HBaseAdmin(conf);
+
+        hdfsFS = FileSystem.get(new Configuration());
+
+        operations = new ArrayList<Opt>();
+
+        copyFilesInMetaStore(cube, overwriteIfExists);
+        renameFoldersInHdfs(cube);
+        changeHtableHost(cube);
+        addCubeIntoProject(cubeName, projectName);
+
+        if (realExecute.equalsIgnoreCase("true")) {
+            doOpts();
+        } else {
+            showOpts();
+        }
+    }
+
+    public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
+
+        moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, overwriteIfExists, realExecute);
+    }
+
+    private static String checkAndGetHbaseUrl() {
+        String srcMetadataUrl = srcConfig.getMetadataUrl();
+        String dstMetadataUrl = dstConfig.getMetadataUrl();
+
+        logger.info("src metadata url is " + srcMetadataUrl);
+        logger.info("dst metadata url is " + dstMetadataUrl);
+
+        int srcIndex = srcMetadataUrl.toLowerCase().indexOf("hbase:");
+        int dstIndex = dstMetadataUrl.toLowerCase().indexOf("hbase:");
+        if (srcIndex < 0 || dstIndex < 0)
+            throw new IllegalStateException("Both metadata urls should be hbase metadata url");
+
+        String srcHbaseUrl = srcMetadataUrl.substring(srcIndex).trim();
+        String dstHbaseUrl = dstMetadataUrl.substring(dstIndex).trim();
+        if (!srcHbaseUrl.equalsIgnoreCase(dstHbaseUrl)) {
+            throw new IllegalStateException("hbase url not equal! ");
+        }
+
+        logger.info("hbase url is " + srcHbaseUrl.trim());
+        return srcHbaseUrl.trim();
+    }
+
+    private static void renameFoldersInHdfs(CubeInstance cube) {
+        for (CubeSegment segment : cube.getSegments()) {
+
+            String jobUuid = segment.getLastBuildJobID();
+            String src = JobInstance.getJobWorkingDir(jobUuid, srcConfig.getHdfsWorkingDirectory());
+            String tgt = JobInstance.getJobWorkingDir(jobUuid, dstConfig.getHdfsWorkingDirectory());
+
+            operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] { src, tgt }));
+        }
+
+    }
+
+    private static void changeHtableHost(CubeInstance cube) {
+        for (CubeSegment segment : cube.getSegments()) {
+            operations.add(new Opt(OptType.CHANGE_HTABLE_HOST,
+                    new Object[] { segment.getStorageLocationIdentifier() }));
+        }
+    }
+
+    private static void copyFilesInMetaStore(CubeInstance cube, String overwriteIfExists) throws IOException {
+
+        List<String> metaItems = new ArrayList<String>();
+        List<String> dictAndSnapshot = new ArrayList<String>();
+        listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
+
+        if (dstStore.exists(cube.getResourcePath()) && !overwriteIfExists.equalsIgnoreCase("true"))
+            throw new IllegalStateException("The cube named " + cube.getName() + " already exists on target metadata store. Use overwriteIfExists to overwrite it");
+
+        for (String item : metaItems) {
+            operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item }));
+        }
+
+        for (String item : dictAndSnapshot) {
+            operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
+        }
+    }
+
+    private static void addCubeIntoProject(String cubeName, String projectName) throws IOException {
+        String projectResPath = ProjectInstance.concatResourcePath(projectName);
+        if (!dstStore.exists(projectResPath))
+            throw new IllegalStateException("The target project " + projectName + "does not exist");
+
+        operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { cubeName, projectName }));
+    }
+
+    private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, List<String> dictAndSnapshot) throws IOException {
+
+        CubeDesc cubeDesc = cube.getDescriptor();
+        metaResource.add(cube.getResourcePath());
+        metaResource.add(cubeDesc.getResourcePath());
+
+        for (TableDesc tableDesc : cubeDesc.listTables()) {
+            metaResource.add(tableDesc.getResourcePath());
+        }
+
+        for (CubeSegment segment : cube.getSegments()) {
+            dictAndSnapshot.addAll(segment.getSnapshotPaths());
+            dictAndSnapshot.addAll(segment.getDictionaryPaths());
+        }
+    }
+
+    private static enum OptType {
+        COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST
+    }
+
+    private static class Opt {
+        private OptType type;
+        private Object[] params;
+
+        private Opt(OptType type, Object[] params) {
+            this.type = type;
+            this.params = params;
+        }
+
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append(type).append(":");
+            for (Object s : params)
+                sb.append(s).append(", ");
+            return sb.toString();
+        }
+
+    }
+
+    private static void showOpts() {
+        for (int i = 0; i < operations.size(); ++i) {
+            showOpt(operations.get(i));
+        }
+    }
+
+    private static void showOpt(Opt opt) {
+        logger.info("Operation: " + opt.toString());
+    }
+
+    private static void doOpts() throws IOException, InterruptedException {
+        int index = 0;
+        try {
+            for (; index < operations.size(); ++index) {
+                logger.info("Operation index :" + index);
+                doOpt(operations.get(index));
+            }
+        } catch (Exception e) {
+            logger.error("error met", e);
+            logger.info("Try undoing previous changes");
+            // undo:
+            for (int i = index; i >= 0; --i) {
+                try {
+                    undo(operations.get(i));
+                } catch (Exception ee) {
+                    logger.error("error met ", e);
+                    logger.info("Continue undoing...");
+                }
+            }
+
+            throw new RuntimeException("Cube moving failed");
+        }
+    }
+
+    private static void doOpt(Opt opt) throws IOException, InterruptedException {
+        logger.info("Executing operation: " + opt.toString());
+
+        switch (opt.type) {
+        case CHANGE_HTABLE_HOST: {
+            String tableName = (String) opt.params[0];
+            HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            hbaseAdmin.disableTable(tableName);
+            desc.setValue(CubeManager.getHtableMetadataKey(), dstConfig.getMetadataUrlPrefix());
+            hbaseAdmin.modifyTable(tableName, desc);
+            hbaseAdmin.enableTable(tableName);
+            logger.info("CHANGE_HTABLE_HOST is completed");
+            break;
+        }
+        case COPY_FILE_IN_META: {
+            String item = (String) opt.params[0];
+            InputStream inputStream = srcStore.getResource(item);
+            long ts = srcStore.getResourceTimestamp(item);
+            dstStore.putResource(item, inputStream, ts);
+            inputStream.close();
+            logger.info("Item " + item + " is copied");
+            break;
+        }
+        case COPY_DICT_OR_SNAPSHOT: {
+            String item = (String) opt.params[0];
+
+            if (item.toLowerCase().endsWith(".dict")) {
+                DictionaryManager dstDictMgr = DictionaryManager.getInstance(dstConfig);
+                DictionaryManager srcDicMgr = DictionaryManager.getInstance(srcConfig);
+                DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item);
+
+                long ts = dictSrc.getLastModified();
+                dictSrc.setLastModified(0);//to avoid resource store write conflict
+                DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictSrc.getDictionaryObject(), dictSrc);
+                dictSrc.setLastModified(ts);
+
+                if (dictSaved == dictSrc) {
+                    //no dup found, already saved to dest
+                    logger.info("Item " + item + " is copied");
+                } else {
+                    //dictSrc is rejected because of duplication
+                    //modify cube's dictionary path
+                    String cubeName = (String) opt.params[1];
+                    String cubeResPath = CubeInstance.concatResourcePath(cubeName);
+                    Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
+                    CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer);
+                    for (CubeSegment segment : cube.getSegments()) {
+                        for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) {
+                            if (entry.getValue().equalsIgnoreCase(item)) {
+                                entry.setValue(dictSaved.getResourcePath());
+                            }
+                        }
+                    }
+                    dstStore.putResource(cubeResPath, cube, cubeSerializer);
+                    logger.info("Item " + item + " is dup, instead " + dictSaved.getResourcePath() + " is reused");
+                }
+
+            } else if (item.toLowerCase().endsWith(".snapshot")) {
+                SnapshotManager dstSnapMgr = SnapshotManager.getInstance(dstConfig);
+                SnapshotManager srcSnapMgr = SnapshotManager.getInstance(srcConfig);
+                SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item);
+
+                long ts = snapSrc.getLastModified();
+                snapSrc.setLastModified(0);
+                SnapshotTable snapSaved = dstSnapMgr.trySaveNewSnapshot(snapSrc);
+                snapSrc.setLastModified(ts);
+
+
+                if (snapSaved == snapSrc) {
+                    //no dup found, already saved to dest
+                    logger.info("Item " + item + " is copied");
+
+                } else {
+                    String cubeName = (String) opt.params[1];
+                    String cubeResPath = CubeInstance.concatResourcePath(cubeName);
+                    Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
+                    CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer);
+                    for (CubeSegment segment : cube.getSegments()) {
+                        for (Map.Entry<String, String> entry : segment.getSnapshots().entrySet()) {
+                            if (entry.getValue().equalsIgnoreCase(item)) {
+                                entry.setValue(snapSaved.getResourcePath());
+                            }
+                        }
+                    }
+                    dstStore.putResource(cubeResPath, cube, cubeSerializer);
+                    logger.info("Item " + item + " is dup, instead " + snapSaved.getResourcePath() + " is reused");
+
+                }
+
+            } else {
+                logger.error("unknown item found: " + item);
+                logger.info("ignore it");
+            }
+
+            break;
+        }
+        case RENAME_FOLDER_IN_HDFS: {
+            String srcPath = (String) opt.params[0];
+            String dstPath = (String) opt.params[1];
+            hdfsFS.rename(new Path(srcPath), new Path(dstPath));
+            logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
+            break;
+        }
+        case ADD_INTO_PROJECT: {
+            String cubeName = (String) opt.params[0];
+            String projectName = (String) opt.params[1];
+            String projectResPath = ProjectInstance.concatResourcePath(projectName);
+            Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
+            ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer);
+            project.removeCube(cubeName);
+            project.addCube(cubeName);
+            dstStore.putResource(projectResPath, project, projectSerializer);
+            logger.info("Project instance for " + projectName + " is corrected");
+            break;
+        }
+        }
+    }
+
+    private static void undo(Opt opt) throws IOException, InterruptedException {
+        logger.info("Undo operation: " + opt.toString());
+
+        switch (opt.type) {
+        case CHANGE_HTABLE_HOST: {
+            String tableName = (String) opt.params[0];
+            HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            hbaseAdmin.disableTable(tableName);
+            desc.setValue(CubeManager.getHtableMetadataKey(), srcConfig.getMetadataUrlPrefix());
+            hbaseAdmin.modifyTable(tableName, desc);
+            hbaseAdmin.enableTable(tableName);
+            break;
+        }
+        case COPY_FILE_IN_META: {
+            // no harm
+            logger.info("Undo for COPY_FILE_IN_META is ignored");
+            break;
+        }
+        case COPY_DICT_OR_SNAPSHOT: {
+            // no harm
+            logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
+            break;
+        }
+        case RENAME_FOLDER_IN_HDFS: {
+            String srcPath = (String) opt.params[1];
+            String dstPath = (String) opt.params[0];
+
+            if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new Path(dstPath))) {
+                hdfsFS.rename(new Path(srcPath), new Path(dstPath));
+                logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
+            }
+            break;
+        }
+        case ADD_INTO_PROJECT: {
+            logger.info("Undo for ADD_INTO_PROJECT is ignored");
+            break;
+        }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java b/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java
new file mode 100644
index 0000000..8c81e73
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.tools;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+
+import org.apache.commons.httpclient.ConnectTimeoutException;
+import org.apache.commons.httpclient.HttpClientError;
+import org.apache.commons.httpclient.params.HttpConnectionParams;
+import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory;
+import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xduo
+ * 
+ */
+public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFactory {
+    /** Log object for this class. */
+    private static Logger LOG = LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class);
+    private SSLContext sslcontext = null;
+
+    /**
+     * Constructor for DefaultSslProtocolSocketFactory.
+     */
+    public DefaultSslProtocolSocketFactory() {
+        super();
+    }
+
+    /**
+     * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int)
+     */
+    public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException {
+        return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort);
+    }
+
+    /**
+     * Attempts to get a new socket connection to the given host within the
+     * given time limit.
+     * 
+     * <p>
+     * To circumvent the limitations of older JREs that do not support connect
+     * timeout a controller thread is executed. The controller thread attempts
+     * to create a new socket within the given limit of time. If socket
+     * constructor does not return until the timeout expires, the controller
+     * terminates and throws an {@link ConnectTimeoutException}
+     * </p>
+     * 
+     * @param host
+     *            the host name/IP
+     * @param port
+     *            the port on the host
+     * @param localAddress
+     *            the local host name/IP to bind the socket to
+     * @param localPort
+     *            the port on the local machine
+     * @param params
+     *            {@link HttpConnectionParams Http connection parameters}
+     * 
+     * @return Socket a new socket
+     * 
+     * @throws IOException
+     *             if an I/O error occurs while creating the socket
+     * @throws UnknownHostException
+     *             if the IP address of the host cannot be determined
+     * @throws ConnectTimeoutException
+     *             DOCUMENT ME!
+     * @throws IllegalArgumentException
+     *             DOCUMENT ME!
+     */
+    public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException {
+        if (params == null) {
+            throw new IllegalArgumentException("Parameters may not be null");
+        }
+
+        int timeout = params.getConnectionTimeout();
+
+        if (timeout == 0) {
+            return createSocket(host, port, localAddress, localPort);
+        } else {
+            // To be eventually deprecated when migrated to Java 1.4 or above
+            return ControllerThreadSocketFactory.createSocket(this, host, port, localAddress, localPort, timeout);
+        }
+    }
+
+    /**
+     * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int)
+     */
+    public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+        return getSSLContext().getSocketFactory().createSocket(host, port);
+    }
+
+    /**
+     * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean)
+     */
+    public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException {
+        return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose);
+    }
+
+    public boolean equals(Object obj) {
+        return ((obj != null) && obj.getClass().equals(DefaultX509TrustManager.class));
+    }
+
+    public int hashCode() {
+        return DefaultX509TrustManager.class.hashCode();
+    }
+
+    private static SSLContext createEasySSLContext() {
+        try {
+            SSLContext context = SSLContext.getInstance("TLS");
+            context.init(null, new TrustManager[] { new DefaultX509TrustManager(null) }, null);
+
+            return context;
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            throw new HttpClientError(e.toString());
+        }
+    }
+
+    private SSLContext getSSLContext() {
+        if (this.sslcontext == null) {
+            this.sslcontext = createEasySSLContext();
+        }
+
+        return this.sslcontext;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java b/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java
new file mode 100644
index 0000000..8dedaa6
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.tools;
+
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xduo
+ * 
+ */
+public class DefaultX509TrustManager implements X509TrustManager {
+
+    /** Log object for this class. */
+    private static Logger LOG = LoggerFactory.getLogger(DefaultX509TrustManager.class);
+    private X509TrustManager standardTrustManager = null;
+
+    /**
+     * Constructor for DefaultX509TrustManager.
+     * 
+     */
+    public DefaultX509TrustManager(KeyStore keystore) throws NoSuchAlgorithmException, KeyStoreException {
+        super();
+
+        TrustManagerFactory factory = TrustManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        factory.init(keystore);
+
+        TrustManager[] trustmanagers = factory.getTrustManagers();
+
+        if (trustmanagers.length == 0) {
+            throw new NoSuchAlgorithmException("SunX509 trust manager not supported");
+        }
+
+        this.standardTrustManager = (X509TrustManager) trustmanagers[0];
+    }
+
+    public X509Certificate[] getAcceptedIssuers() {
+        return this.standardTrustManager.getAcceptedIssuers();
+    }
+
+    public boolean isClientTrusted(X509Certificate[] certificates) {
+        return true;
+        // return this.standardTrustManager.isClientTrusted(certificates);
+    }
+
+    public boolean isServerTrusted(X509Certificate[] certificates) {
+        if ((certificates != null) && LOG.isDebugEnabled()) {
+            LOG.debug("Server certificate chain:");
+
+            for (int i = 0; i < certificates.length; i++) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("X509Certificate[" + i + "]=" + certificates[i]);
+                }
+            }
+        }
+
+        if ((certificates != null) && (certificates.length == 1)) {
+            X509Certificate certificate = certificates[0];
+
+            try {
+                certificate.checkValidity();
+            } catch (CertificateException e) {
+                LOG.error(e.toString());
+
+                return false;
+            }
+
+            return true;
+        } else {
+            return true;
+            // return this.standardTrustManager.isServerTrusted(certificates);
+        }
+    }
+
+    @Override
+    public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+        // TODO Auto-generated method stub
+
+    }
+
+}