You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/12 06:22:38 UTC
[18/97] [abbrv] [partial] incubator-kylin git commit: cleanup for
migration from github.com
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java
deleted file mode 100644
index 022963d..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIBulkLoadJob.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.HadoopUtil;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc;
-
-/**
- * @author ysong1
- *
- */
-public class IIBulkLoadJob extends AbstractHadoopJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- options.addOption(OPTION_CUBE_NAME);
- parseOptions(options, args);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME);
- String input = getOptionValue(OPTION_INPUT_PATH);
- String cubeName = getOptionValue(OPTION_CUBE_NAME);
-
- FileSystem fs = FileSystem.get(getConf());
- FsPermission permission = new FsPermission((short) 0777);
- fs.setPermission(new Path(input, InvertedIndexDesc.HBASE_FAMILY), permission);
-
- int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName });
-
- CubeManager mgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = mgr.getCube(cubeName);
- CubeSegment seg = cube.getFirstSegment();
- seg.setStorageLocationIdentifier(tableName);
- seg.setStatus(CubeSegmentStatusEnum.READY);
- mgr.updateCube(cube);
-
- return hbaseExitCode;
-
- } catch (Exception e) {
- printUsage(options);
- e.printStackTrace(System.err);
- return 2;
- }
- }
-
- public static void main(String[] args) throws Exception {
- IIBulkLoadJob job = new IIBulkLoadJob();
- job.setConf(HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl()));
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java
deleted file mode 100644
index 0c1afd0..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileJob.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.invertedindex;
-
-import java.io.File;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.HadoopUtil;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author yangli9
- *
- */
-public class IICreateHFileJob extends AbstractHadoopJob {
-
- protected static final Logger log = LoggerFactory.getLogger(IICreateHFileJob.class);
-
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- parseOptions(options, args);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
- FileOutputFormat.setOutputPath(job, output);
-
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(IICreateHFileMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME);
- HTable htable = new HTable(getConf(), tableName);
- HFileOutputFormat.configureIncrementalLoad(job, htable);
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- public static void main(String[] args) throws Exception {
- IICreateHFileJob job = new IICreateHFileJob();
- job.setConf(HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl()));
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java
deleted file mode 100644
index 2ceaa1c..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHFileMapper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.invertedindex;
-
-import static com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc.*;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * @author yangli9
- *
- */
-public class IICreateHFileMapper extends Mapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
-
- long timestamp;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- timestamp = System.currentTimeMillis();
- }
-
- @Override
- protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
-
- KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
- HBASE_FAMILY_BYTES, 0, HBASE_FAMILY_BYTES.length, //
- HBASE_QUALIFIER_BYTES, 0, HBASE_QUALIFIER_BYTES.length, //
- timestamp, Type.Put, //
- value.get(), value.getOffset(), value.getLength());
-
- context.write(key, kv);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java
deleted file mode 100644
index 04fd274..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IICreateHTableJob.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.BytesUtil;
-import com.kylinolap.common.util.HadoopUtil;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.invertedindex.IIKeyValueCodec;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.invertedindex.InvertedIndexDesc;
-
-/**
- * @author George Song (ysong1)
- */
-public class IICreateHTableJob extends AbstractHadoopJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_HTABLE_NAME);
- parseOptions(options, args);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME);
- String cubeName = getOptionValue(OPTION_CUBE_NAME);
-
- CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
- int sharding = cubeInstance.getInvertedIndexDesc().getSharding();
-
- HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
- HColumnDescriptor cf = new HColumnDescriptor(InvertedIndexDesc.HBASE_FAMILY);
- cf.setMaxVersions(1);
- cf.setCompressionType(Algorithm.LZO);
- cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
- tableDesc.addFamily(cf);
-
- Configuration conf = HBaseConfiguration.create(getConf());
- if (User.isHBaseSecurityEnabled(conf)) {
- // add coprocessor for bulk load
- tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
- }
-
- // drop the table first
- HBaseAdmin admin = new HBaseAdmin(conf);
- if (admin.tableExists(tableName)) {
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
- }
-
- // create table
- byte[][] splitKeys = getSplits(sharding);
- if (splitKeys.length == 0)
- splitKeys = null;
- admin.createTable(tableDesc, splitKeys);
- if (splitKeys != null) {
- for (int i = 0; i < splitKeys.length; i++) {
- System.out.println("split key " + i + ": " + BytesUtil.toHex(splitKeys[i]));
- }
- }
- System.out.println("create hbase table " + tableName + " done.");
- admin.close();
-
- return 0;
- } catch (Exception e) {
- printUsage(options);
- e.printStackTrace(System.err);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- //one region for one shard
- private byte[][] getSplits(int shard) {
- byte[][] result = new byte[shard - 1][];
- for (int i = 1; i < shard; ++i) {
- byte[] split = new byte[IIKeyValueCodec.SHARD_LEN];
- BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN);
- result[i - 1] = split;
- }
- return result;
- }
-
- public static void main(String[] args) throws Exception {
- IICreateHTableJob job = new IICreateHTableJob();
- job.setConf(HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl()));
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
deleted file mode 100644
index 83219d2..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.kylinolap.common.util.ByteArray;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsCombiner extends Reducer<ShortWritable, Text, ShortWritable, Text> {
-
- private Text outputValue = new Text();
-
- @Override
- protected void setup(Context context) throws IOException {
- }
-
- @Override
- public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
- HashSet<ByteArray> set = new HashSet<ByteArray>();
- for (Text textValue : values) {
- ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
- set.add(value);
- }
-
- for (ByteArray value : set) {
- outputValue.set(value.data);
- context.write(key, outputValue);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java
deleted file mode 100644
index bc12db2..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.invertedindex;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.MetadataManager;
-import com.kylinolap.metadata.model.schema.ColumnDesc;
-import com.kylinolap.metadata.model.schema.TableDesc;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsJob extends AbstractHadoopJob {
- protected static final Logger log = LoggerFactory.getLogger(IIDistinctColumnsJob.class);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_TABLE_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_INPUT_FORMAT);
- options.addOption(OPTION_INPUT_DELIM);
- options.addOption(OPTION_OUTPUT_PATH);
- parseOptions(options, args);
-
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
- String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase();
- Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
- String inputFormat = getOptionValue(OPTION_INPUT_FORMAT);
- String inputDelim = getOptionValue(OPTION_INPUT_DELIM);
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
- // ----------------------------------------------------------------------------
-
- System.out.println("Starting: " + job.getJobName());
-
- setupMapInput(input, inputFormat, inputDelim);
- setupReduceOutput(output);
-
- // pass table and columns
- MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
- TableDesc table = metaMgr.getTableDesc(tableName);
- job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
- job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(table));
-
- return waitForCompletion(job);
-
- } catch (Exception e) {
- printUsage(options);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
-
- }
-
- private String getColumns(TableDesc table) {
- StringBuilder buf = new StringBuilder();
- for (ColumnDesc col : table.getColumns()) {
- if (buf.length() > 0)
- buf.append(",");
- buf.append(col.getName());
- }
- return buf.toString();
- }
-
- private void setupMapInput(Path input, String inputFormat, String inputDelim) throws IOException {
- FileInputFormat.setInputPaths(job, input);
-
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
-
- if ("textinputformat".equalsIgnoreCase(inputFormat) || "text".equalsIgnoreCase(inputFormat)) {
- job.setInputFormatClass(TextInputFormat.class);
- } else {
- job.setInputFormatClass(SequenceFileInputFormat.class);
- }
-
- if ("t".equals(inputDelim)) {
- inputDelim = "\t";
- } else if ("177".equals(inputDelim)) {
- inputDelim = "\177";
- }
- if (inputDelim != null) {
- job.getConfiguration().set(BatchConstants.INPUT_DELIM, inputDelim);
- }
-
- job.setMapperClass(IIDistinctColumnsMapper.class);
- job.setCombinerClass(IIDistinctColumnsCombiner.class);
- job.setMapOutputKeyClass(ShortWritable.class);
- job.setMapOutputValueClass(Text.class);
- }
-
- private void setupReduceOutput(Path output) throws IOException {
- job.setReducerClass(IIDistinctColumnsReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
-
- FileOutputFormat.setOutputPath(job, output);
- job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
- job.setNumReduceTasks(1);
-
- deletePath(job.getConfiguration(), output);
- }
-
- public static void main(String[] args) throws Exception {
- IIDistinctColumnsJob job = new IIDistinctColumnsJob();
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
deleted file mode 100644
index b7456bf..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.kylinolap.cube.common.BytesSplitter;
-import com.kylinolap.cube.common.SplittedBytes;
-import com.kylinolap.job.constant.BatchConstants;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsMapper<KEYIN> extends Mapper<KEYIN, Text, ShortWritable, Text> {
-
- private String[] columns;
- private int delim;
- private BytesSplitter splitter;
-
- private ShortWritable outputKey = new ShortWritable();
- private Text outputValue = new Text();
-
- @Override
- protected void setup(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
- String inputDelim = conf.get(BatchConstants.INPUT_DELIM);
- this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0);
- this.splitter = new BytesSplitter(200, 4096);
- }
-
- @Override
- public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
- if (delim == -1) {
- delim = splitter.detectDelim(value, columns.length);
- }
-
- int nParts = splitter.split(value.getBytes(), value.getLength(), (byte) delim);
- SplittedBytes[] parts = splitter.getSplitBuffers();
-
- if (nParts != columns.length) {
- throw new RuntimeException("Got " + parts.length + " from -- " + value.toString() + " -- but only " + columns.length + " expected");
- }
-
- for (short i = 0; i < nParts; i++) {
- outputKey.set(i);
- outputValue.set(parts[i].value, 0, parts[i].length);
- context.write(outputKey, outputValue);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
deleted file mode 100644
index f170057..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.kylinolap.common.util.ByteArray;
-import com.kylinolap.job.constant.BatchConstants;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsReducer extends Reducer<ShortWritable, Text, NullWritable, Text> {
-
- private String[] columns;
-
- @Override
- protected void setup(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
- }
-
- @Override
- public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- String columnName = columns[key.get()];
-
- HashSet<ByteArray> set = new HashSet<ByteArray>();
- for (Text textValue : values) {
- ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
- set.add(value);
- }
-
- Configuration conf = context.getConfiguration();
- FileSystem fs = FileSystem.get(conf);
- String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
- FSDataOutputStream out = fs.create(new Path(outputPath, columnName));
-
- try {
- for (ByteArray value : set) {
- out.write(value.data);
- out.write('\n');
- }
- } finally {
- out.close();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java
deleted file mode 100644
index 6681db6..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexJob.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.invertedindex;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexJob extends AbstractHadoopJob {
- protected static final Logger log = LoggerFactory.getLogger(InvertedIndexJob.class);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_INPUT_FORMAT);
- options.addOption(OPTION_INPUT_DELIM);
- options.addOption(OPTION_OUTPUT_PATH);
- parseOptions(options, args);
-
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
- String cubeName = getOptionValue(OPTION_CUBE_NAME);
- Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
- String inputFormat = getOptionValue(OPTION_INPUT_FORMAT);
- String inputDelim = getOptionValue(OPTION_INPUT_DELIM);
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
- // ----------------------------------------------------------------------------
-
- System.out.println("Starting: " + job.getJobName());
-
- CubeInstance cube = getCube(cubeName);
-
- setupMapInput(input, inputFormat, inputDelim);
- setupReduceOutput(output, cube.getInvertedIndexDesc().getSharding());
- attachMetadata(cube);
-
- return waitForCompletion(job);
-
- } catch (Exception e) {
- printUsage(options);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
-
- }
-
- /**
- * @param cubeName
- * @return
- */
- private CubeInstance getCube(String cubeName) {
- CubeManager mgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = mgr.getCube(cubeName);
- if (cube == null)
- throw new IllegalArgumentException("No Inverted Index Cubefound by name " + cubeName);
- return cube;
- }
-
- private void attachMetadata(CubeInstance cube) throws IOException {
-
- Configuration conf = job.getConfiguration();
- attachKylinPropsAndMetadata(cube, conf);
-
- CubeSegment seg = cube.getFirstSegment();
- conf.set(BatchConstants.CFG_CUBE_NAME, cube.getName());
- conf.set(BatchConstants.CFG_CUBE_SEGMENT_NAME, seg.getName());
- }
-
- private void setupMapInput(Path input, String inputFormat, String inputDelim) throws IOException {
- FileInputFormat.setInputPaths(job, input);
-
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
-
- if ("textinputformat".equalsIgnoreCase(inputFormat) || "text".equalsIgnoreCase(inputFormat)) {
- job.setInputFormatClass(TextInputFormat.class);
- } else {
- job.setInputFormatClass(SequenceFileInputFormat.class);
- }
-
- if ("t".equals(inputDelim)) {
- inputDelim = "\t";
- } else if ("177".equals(inputDelim)) {
- inputDelim = "\177";
- }
- if (inputDelim != null) {
- job.getConfiguration().set(BatchConstants.INPUT_DELIM, inputDelim);
- }
-
- job.setMapperClass(InvertedIndexMapper.class);
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(ImmutableBytesWritable.class);
- job.setPartitionerClass(InvertedIndexPartitioner.class);
- }
-
- private void setupReduceOutput(Path output, short sharding) throws IOException {
- job.setReducerClass(InvertedIndexReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(ImmutableBytesWritable.class);
-
- job.setNumReduceTasks(sharding);
-
- FileOutputFormat.setOutputPath(job, output);
-
- job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
- deletePath(job.getConfiguration(), output);
- }
-
- public static void main(String[] args) throws Exception {
- InvertedIndexJob job = new InvertedIndexJob();
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java
deleted file mode 100644
index f555c40..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexMapper.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-import com.kylinolap.cube.common.BytesSplitter;
-import com.kylinolap.cube.common.SplittedBytes;
-import com.kylinolap.cube.invertedindex.TableRecord;
-import com.kylinolap.cube.invertedindex.TableRecordInfo;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author yangli9
- *
- */
-public class InvertedIndexMapper<KEYIN> extends Mapper<KEYIN, Text, LongWritable, ImmutableBytesWritable> {
-
- private TableRecordInfo info;
- private TableRecord rec;
- private int delim;
- private BytesSplitter splitter;
-
- private LongWritable outputKey;
- private ImmutableBytesWritable outputValue;
-
- @Override
- protected void setup(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- String inputDelim = conf.get(BatchConstants.INPUT_DELIM);
- this.delim = inputDelim == null ? -1 : inputDelim.codePointAt(0);
- this.splitter = new BytesSplitter(200, 4096);
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
- CubeManager mgr = CubeManager.getInstance(config);
- CubeInstance cube = mgr.getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
- CubeSegment seg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), CubeSegmentStatusEnum.NEW);
- this.info = new TableRecordInfo(seg);
- this.rec = new TableRecord(this.info);
-
- outputKey = new LongWritable();
- outputValue = new ImmutableBytesWritable(rec.getBytes());
- }
-
- @Override
- public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
- if (delim == -1) {
- delim = splitter.detectDelim(value, info.getColumnCount());
- }
-
- int nParts = splitter.split(value.getBytes(), value.getLength(), (byte) delim);
- SplittedBytes[] parts = splitter.getSplitBuffers();
-
- if (nParts != info.getColumnCount()) {
- throw new RuntimeException("Got " + parts.length + " from -- " + value.toString() + " -- but only " + info.getColumnCount() + " expected");
- }
-
- rec.reset();
- for (int i = 0; i < nParts; i++) {
- rec.setValueString(i, Bytes.toString(parts[i].value, 0, parts[i].length));
- }
-
- outputKey.set(rec.getTimestamp());
- // outputValue's backing bytes array is the same as rec
-
- context.write(outputKey, outputValue);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java
deleted file mode 100644
index bd06d74..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexPartitioner.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-import com.kylinolap.cube.invertedindex.TableRecord;
-import com.kylinolap.cube.invertedindex.TableRecordInfo;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author yangli9
- *
- */
-public class InvertedIndexPartitioner extends Partitioner<LongWritable, ImmutableBytesWritable> implements Configurable {
-
- private Configuration conf;
- private TableRecordInfo info;
- private TableRecord rec;
-
- @Override
- public int getPartition(LongWritable key, ImmutableBytesWritable value, int numPartitions) {
- rec.setBytes(value.get(), value.getOffset(), value.getLength());
- return rec.getShard();
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- try {
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
- CubeManager mgr = CubeManager.getInstance(config);
- CubeInstance cube = mgr.getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
- CubeSegment seg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), CubeSegmentStatusEnum.NEW);
- this.info = new TableRecordInfo(seg);
- this.rec = new TableRecord(this.info);
- } catch (IOException e) {
- throw new RuntimeException("", e);
- }
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java
deleted file mode 100644
index 09954a1..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/InvertedIndexReducer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-import com.kylinolap.cube.invertedindex.IIKeyValueCodec;
-import com.kylinolap.cube.invertedindex.TableRecord;
-import com.kylinolap.cube.invertedindex.TableRecordInfo;
-import com.kylinolap.cube.invertedindex.Slice;
-import com.kylinolap.cube.invertedindex.SliceBuilder;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexReducer extends Reducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
-
- private TableRecordInfo info;
- private TableRecord rec;
- private SliceBuilder builder;
- private IIKeyValueCodec kv;
-
- @Override
- protected void setup(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
- CubeManager mgr = CubeManager.getInstance(config);
- CubeInstance cube = mgr.getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
- CubeSegment seg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), CubeSegmentStatusEnum.NEW);
- info = new TableRecordInfo(seg);
- rec = new TableRecord(info);
- builder = null;
- kv = new IIKeyValueCodec(info);
- }
-
- @Override
- public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
- throws IOException, InterruptedException {
- for (ImmutableBytesWritable v : values) {
- rec.setBytes(v.get(), v.getOffset(), v.getLength());
-
- if (builder == null) {
- builder = new SliceBuilder(info, rec.getShard());
- }
- System.out.println(rec.getShard() + " - " + rec);
-
- Slice slice = builder.append(rec);
- if (slice != null) {
- output(slice, context);
- }
- }
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- Slice slice = builder.close();
- if (slice != null) {
- output(slice, context);
- }
- }
-
- private void output(Slice slice, Context context) throws IOException, InterruptedException {
- for (Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair : kv.encodeKeyValue(slice)) {
- context.write(pair.getFirst(), pair.getSecond());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java
deleted file mode 100644
index 096ae86..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionJob.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.invertedindex;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author ysong1
- *
- */
-@SuppressWarnings("static-access")
-public class RandomKeyDistributionJob extends AbstractHadoopJob {
-
- protected static final Logger log = LoggerFactory.getLogger(RandomKeyDistributionJob.class);
-
- static final Option OPTION_KEY_CLASS = OptionBuilder.withArgName("keyclass").hasArg().isRequired(true).withDescription("Key Class").create("keyclass");
- static final Option OPTION_REGION_MB = OptionBuilder.withArgName("regionmb").hasArg().isRequired(true).withDescription("MB per Region").create("regionmb");
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_KEY_CLASS);
- options.addOption(OPTION_REGION_MB);
-
- parseOptions(options, args);
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- job = Job.getInstance(getConf(), jobName);
-
- job.setJarByClass(this.getClass());
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
-
- String keyClass = getOptionValue(OPTION_KEY_CLASS);
- Class<?> keyClz = Class.forName(keyClass);
-
- int regionMB = Integer.parseInt(getOptionValue(OPTION_REGION_MB));
-
- // Mapper
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(RandomKeyDistributionMapper.class);
- job.setMapOutputKeyClass(keyClz);
- job.setMapOutputValueClass(NullWritable.class);
-
- // Reducer - only one
- job.setReducerClass(RandomKeyDistributionReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(keyClz);
- job.setOutputValueClass(NullWritable.class);
- job.setNumReduceTasks(1);
-
- this.deletePath(job.getConfiguration(), output);
-
- // total map input MB
- double totalMapInputMB = this.getTotalMapInputMB();
- int regionCount = Math.max(1, (int) (totalMapInputMB / regionMB));
- int mapSampleNumber = 1000;
- System.out.println("Total Map Input MB: " + totalMapInputMB);
- System.out.println("Region Count: " + regionCount);
-
- // set job configuration
- job.getConfiguration().set(BatchConstants.MAPPER_SAMPLE_NUMBER, String.valueOf(mapSampleNumber));
- job.getConfiguration().set(BatchConstants.REGION_NUMBER, String.valueOf(regionCount));
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new RandomKeyDistributionJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
deleted file mode 100644
index c434f69..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.kylinolap.common.util.RandomSampler;
-import com.kylinolap.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- *
- */
-public class RandomKeyDistributionMapper<KEY extends Writable, VALUE> extends Mapper<KEY, VALUE, KEY, NullWritable> {
-
- private Configuration conf;
- private int sampleNumber;
- private List<KEY> allKeys;
-
- @Override
- protected void setup(Context context) throws IOException {
- conf = context.getConfiguration();
- allKeys = new ArrayList<KEY>();
- sampleNumber = Integer.parseInt(conf.get(BatchConstants.MAPPER_SAMPLE_NUMBER));
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException {
- KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf);
- ReflectionUtils.copy(conf, key, keyCopy);
- allKeys.add(keyCopy);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- RandomSampler<KEY> sampler = new RandomSampler<KEY>();
- List<KEY> sampleResult = sampler.sample(allKeys, sampleNumber);
- for (KEY k : sampleResult) {
- context.write(k, NullWritable.get());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
deleted file mode 100644
index f5475f2..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.kylinolap.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- *
- */
-public class RandomKeyDistributionReducer<KEY extends Writable> extends Reducer<KEY, NullWritable, KEY, NullWritable> {
-
- private Configuration conf;
- private int regionNumber;
- private List<KEY> allSplits;
-
- @Override
- protected void setup(Context context) throws IOException {
- conf = context.getConfiguration();
- allSplits = new ArrayList<KEY>();
- regionNumber = Integer.parseInt(context.getConfiguration().get(BatchConstants.REGION_NUMBER));
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void reduce(KEY key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
- KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf);
- ReflectionUtils.copy(conf, key, keyCopy);
- allSplits.add(keyCopy);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- int stepLength = allSplits.size() / regionNumber;
- for (int i = stepLength; i < allSplits.size(); i += stepLength) {
- context.write(allSplits.get(i), NullWritable.get());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java b/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java
deleted file mode 100644
index 28237ca..0000000
--- a/job/src/main/java/com/kylinolap/job/tools/CubeMigrationCLI.java
+++ /dev/null
@@ -1,418 +0,0 @@
-package com.kylinolap.job.tools;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.persistence.JsonSerializer;
-import com.kylinolap.common.persistence.ResourceStore;
-import com.kylinolap.common.persistence.Serializer;
-import com.kylinolap.cube.*;
-import com.kylinolap.cube.project.ProjectInstance;
-import com.kylinolap.dict.DictionaryInfo;
-import com.kylinolap.dict.DictionaryManager;
-import com.kylinolap.dict.lookup.SnapshotManager;
-import com.kylinolap.dict.lookup.SnapshotTable;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.schema.TableDesc;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by honma on 9/3/14.
- * <p/>
- * This tool serves for the purpose of migrating cubes. e.g. upgrade cube from
- * dev env to test(prod) env, or vice versa.
- * <p/>
- * Note that different envs are assumed to share the same hadoop cluster,
- * including hdfs, hbase and hive.
- */
-public class CubeMigrationCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCLI.class);
-
- private static List<Opt> operations;
- private static KylinConfig srcConfig;
- private static KylinConfig dstConfig;
- private static ResourceStore srcStore;
- private static ResourceStore dstStore;
- private static FileSystem hdfsFS;
- private static HBaseAdmin hbaseAdmin;
-
- public static void main(String[] args) throws IOException, InterruptedException {
-
- if (args.length != 6) {
- usage();
- System.exit(1);
- }
-
- moveCube(args[0], args[1], args[2], args[3], args[4], args[5]);
- }
-
- private static void usage() {
- System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName overwriteIfExists realExecute");
- System.out.println(
- " srcKylinConfigUri: The KylinConfig of the cube’s source \n" +
- "dstKylinConfigUri: The KylinConfig of the cube’s new home \n" +
- "cubeName: the name of cube to be migrated. \n" +
- "projectName: The target project in the target environment.(Make sure it exist) \n" +
- "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" +
- "realExecute: if false, just print the operations to take, if true, do the real migration. \n");
-
- }
-
- public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
-
- srcConfig = srcCfg;
- srcStore = ResourceStore.getStore(srcConfig);
- dstConfig = dstCfg;
- dstStore = ResourceStore.getStore(dstConfig);
-
- CubeManager cubeManager = CubeManager.getInstance(srcConfig);
- CubeInstance cube = cubeManager.getCube(cubeName);
- logger.info("cube to be moved is : " + cubeName);
-
- if (cube.getStatus() != CubeStatusEnum.READY)
- throw new IllegalStateException("Cannot migrate cube that is not in READY state.");
-
- for (CubeSegment segment : cube.getSegments()) {
- if (segment.getStatus() != CubeSegmentStatusEnum.READY) {
- throw new IllegalStateException("At least one segment is not in READY state");
- }
- }
-
- checkAndGetHbaseUrl();
-
- Configuration conf = HBaseConfiguration.create();
- hbaseAdmin = new HBaseAdmin(conf);
-
- hdfsFS = FileSystem.get(new Configuration());
-
- operations = new ArrayList<Opt>();
-
- copyFilesInMetaStore(cube, overwriteIfExists);
- renameFoldersInHdfs(cube);
- changeHtableHost(cube);
- addCubeIntoProject(cubeName, projectName);
-
- if (realExecute.equalsIgnoreCase("true")) {
- doOpts();
- } else {
- showOpts();
- }
- }
-
- public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
-
- moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, overwriteIfExists, realExecute);
- }
-
- private static String checkAndGetHbaseUrl() {
- String srcMetadataUrl = srcConfig.getMetadataUrl();
- String dstMetadataUrl = dstConfig.getMetadataUrl();
-
- logger.info("src metadata url is " + srcMetadataUrl);
- logger.info("dst metadata url is " + dstMetadataUrl);
-
- int srcIndex = srcMetadataUrl.toLowerCase().indexOf("hbase:");
- int dstIndex = dstMetadataUrl.toLowerCase().indexOf("hbase:");
- if (srcIndex < 0 || dstIndex < 0)
- throw new IllegalStateException("Both metadata urls should be hbase metadata url");
-
- String srcHbaseUrl = srcMetadataUrl.substring(srcIndex).trim();
- String dstHbaseUrl = dstMetadataUrl.substring(dstIndex).trim();
- if (!srcHbaseUrl.equalsIgnoreCase(dstHbaseUrl)) {
- throw new IllegalStateException("hbase url not equal! ");
- }
-
- logger.info("hbase url is " + srcHbaseUrl.trim());
- return srcHbaseUrl.trim();
- }
-
- private static void renameFoldersInHdfs(CubeInstance cube) {
- for (CubeSegment segment : cube.getSegments()) {
-
- String jobUuid = segment.getLastBuildJobID();
- String src = JobInstance.getJobWorkingDir(jobUuid, srcConfig.getHdfsWorkingDirectory());
- String tgt = JobInstance.getJobWorkingDir(jobUuid, dstConfig.getHdfsWorkingDirectory());
-
- operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] { src, tgt }));
- }
-
- }
-
- private static void changeHtableHost(CubeInstance cube) {
- for (CubeSegment segment : cube.getSegments()) {
- operations.add(new Opt(OptType.CHANGE_HTABLE_HOST,
- new Object[] { segment.getStorageLocationIdentifier() }));
- }
- }
-
- private static void copyFilesInMetaStore(CubeInstance cube, String overwriteIfExists) throws IOException {
-
- List<String> metaItems = new ArrayList<String>();
- List<String> dictAndSnapshot = new ArrayList<String>();
- listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
-
- if (dstStore.exists(cube.getResourcePath()) && !overwriteIfExists.equalsIgnoreCase("true"))
- throw new IllegalStateException("The cube named " + cube.getName() + " already exists on target metadata store. Use overwriteIfExists to overwrite it");
-
- for (String item : metaItems) {
- operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item }));
- }
-
- for (String item : dictAndSnapshot) {
- operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
- }
- }
-
- private static void addCubeIntoProject(String cubeName, String projectName) throws IOException {
- String projectResPath = ProjectInstance.concatResourcePath(projectName);
- if (!dstStore.exists(projectResPath))
- throw new IllegalStateException("The target project " + projectName + "does not exist");
-
- operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { cubeName, projectName }));
- }
-
- private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, List<String> dictAndSnapshot) throws IOException {
-
- CubeDesc cubeDesc = cube.getDescriptor();
- metaResource.add(cube.getResourcePath());
- metaResource.add(cubeDesc.getResourcePath());
-
- for (TableDesc tableDesc : cubeDesc.listTables()) {
- metaResource.add(tableDesc.getResourcePath());
- }
-
- for (CubeSegment segment : cube.getSegments()) {
- dictAndSnapshot.addAll(segment.getSnapshotPaths());
- dictAndSnapshot.addAll(segment.getDictionaryPaths());
- }
- }
-
- private static enum OptType {
- COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST
- }
-
- private static class Opt {
- private OptType type;
- private Object[] params;
-
- private Opt(OptType type, Object[] params) {
- this.type = type;
- this.params = params;
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(type).append(":");
- for (Object s : params)
- sb.append(s).append(", ");
- return sb.toString();
- }
-
- }
-
- private static void showOpts() {
- for (int i = 0; i < operations.size(); ++i) {
- showOpt(operations.get(i));
- }
- }
-
- private static void showOpt(Opt opt) {
- logger.info("Operation: " + opt.toString());
- }
-
- private static void doOpts() throws IOException, InterruptedException {
- int index = 0;
- try {
- for (; index < operations.size(); ++index) {
- logger.info("Operation index :" + index);
- doOpt(operations.get(index));
- }
- } catch (Exception e) {
- logger.error("error met", e);
- logger.info("Try undoing previous changes");
- // undo:
- for (int i = index; i >= 0; --i) {
- try {
- undo(operations.get(i));
- } catch (Exception ee) {
- logger.error("error met ", e);
- logger.info("Continue undoing...");
- }
- }
-
- throw new RuntimeException("Cube moving failed");
- }
- }
-
- private static void doOpt(Opt opt) throws IOException, InterruptedException {
- logger.info("Executing operation: " + opt.toString());
-
- switch (opt.type) {
- case CHANGE_HTABLE_HOST: {
- String tableName = (String) opt.params[0];
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
- desc.setValue(CubeManager.getHtableMetadataKey(), dstConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
- logger.info("CHANGE_HTABLE_HOST is completed");
- break;
- }
- case COPY_FILE_IN_META: {
- String item = (String) opt.params[0];
- InputStream inputStream = srcStore.getResource(item);
- long ts = srcStore.getResourceTimestamp(item);
- dstStore.putResource(item, inputStream, ts);
- inputStream.close();
- logger.info("Item " + item + " is copied");
- break;
- }
- case COPY_DICT_OR_SNAPSHOT: {
- String item = (String) opt.params[0];
-
- if (item.toLowerCase().endsWith(".dict")) {
- DictionaryManager dstDictMgr = DictionaryManager.getInstance(dstConfig);
- DictionaryManager srcDicMgr = DictionaryManager.getInstance(srcConfig);
- DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item);
-
- long ts = dictSrc.getLastModified();
- dictSrc.setLastModified(0);//to avoid resource store write conflict
- DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictSrc.getDictionaryObject(), dictSrc);
- dictSrc.setLastModified(ts);
-
- if (dictSaved == dictSrc) {
- //no dup found, already saved to dest
- logger.info("Item " + item + " is copied");
- } else {
- //dictSrc is rejected because of duplication
- //modify cube's dictionary path
- String cubeName = (String) opt.params[1];
- String cubeResPath = CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer);
- for (CubeSegment segment : cube.getSegments()) {
- for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) {
- if (entry.getValue().equalsIgnoreCase(item)) {
- entry.setValue(dictSaved.getResourcePath());
- }
- }
- }
- dstStore.putResource(cubeResPath, cube, cubeSerializer);
- logger.info("Item " + item + " is dup, instead " + dictSaved.getResourcePath() + " is reused");
- }
-
- } else if (item.toLowerCase().endsWith(".snapshot")) {
- SnapshotManager dstSnapMgr = SnapshotManager.getInstance(dstConfig);
- SnapshotManager srcSnapMgr = SnapshotManager.getInstance(srcConfig);
- SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item);
-
- long ts = snapSrc.getLastModified();
- snapSrc.setLastModified(0);
- SnapshotTable snapSaved = dstSnapMgr.trySaveNewSnapshot(snapSrc);
- snapSrc.setLastModified(ts);
-
-
- if (snapSaved == snapSrc) {
- //no dup found, already saved to dest
- logger.info("Item " + item + " is copied");
-
- } else {
- String cubeName = (String) opt.params[1];
- String cubeResPath = CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer);
- for (CubeSegment segment : cube.getSegments()) {
- for (Map.Entry<String, String> entry : segment.getSnapshots().entrySet()) {
- if (entry.getValue().equalsIgnoreCase(item)) {
- entry.setValue(snapSaved.getResourcePath());
- }
- }
- }
- dstStore.putResource(cubeResPath, cube, cubeSerializer);
- logger.info("Item " + item + " is dup, instead " + snapSaved.getResourcePath() + " is reused");
-
- }
-
- } else {
- logger.error("unknown item found: " + item);
- logger.info("ignore it");
- }
-
- break;
- }
- case RENAME_FOLDER_IN_HDFS: {
- String srcPath = (String) opt.params[0];
- String dstPath = (String) opt.params[1];
- hdfsFS.rename(new Path(srcPath), new Path(dstPath));
- logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
- break;
- }
- case ADD_INTO_PROJECT: {
- String cubeName = (String) opt.params[0];
- String projectName = (String) opt.params[1];
- String projectResPath = ProjectInstance.concatResourcePath(projectName);
- Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
- ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer);
- project.removeCube(cubeName);
- project.addCube(cubeName);
- dstStore.putResource(projectResPath, project, projectSerializer);
- logger.info("Project instance for " + projectName + " is corrected");
- break;
- }
- }
- }
-
- private static void undo(Opt opt) throws IOException, InterruptedException {
- logger.info("Undo operation: " + opt.toString());
-
- switch (opt.type) {
- case CHANGE_HTABLE_HOST: {
- String tableName = (String) opt.params[0];
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
- desc.setValue(CubeManager.getHtableMetadataKey(), srcConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
- break;
- }
- case COPY_FILE_IN_META: {
- // no harm
- logger.info("Undo for COPY_FILE_IN_META is ignored");
- break;
- }
- case COPY_DICT_OR_SNAPSHOT: {
- // no harm
- logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
- break;
- }
- case RENAME_FOLDER_IN_HDFS: {
- String srcPath = (String) opt.params[1];
- String dstPath = (String) opt.params[0];
-
- if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new Path(dstPath))) {
- hdfsFS.rename(new Path(srcPath), new Path(dstPath));
- logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
- }
- break;
- }
- case ADD_INTO_PROJECT: {
- logger.info("Undo for ADD_INTO_PROJECT is ignored");
- break;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java b/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java
deleted file mode 100644
index 8c81e73..0000000
--- a/job/src/main/java/com/kylinolap/job/tools/DefaultSslProtocolSocketFactory.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.tools;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.UnknownHostException;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-
-import org.apache.commons.httpclient.ConnectTimeoutException;
-import org.apache.commons.httpclient.HttpClientError;
-import org.apache.commons.httpclient.params.HttpConnectionParams;
-import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory;
-import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xduo
- *
- */
-public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFactory {
- /** Log object for this class. */
- private static Logger LOG = LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class);
- private SSLContext sslcontext = null;
-
- /**
- * Constructor for DefaultSslProtocolSocketFactory.
- */
- public DefaultSslProtocolSocketFactory() {
- super();
- }
-
- /**
- * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int)
- */
- public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException {
- return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort);
- }
-
- /**
- * Attempts to get a new socket connection to the given host within the
- * given time limit.
- *
- * <p>
- * To circumvent the limitations of older JREs that do not support connect
- * timeout a controller thread is executed. The controller thread attempts
- * to create a new socket within the given limit of time. If socket
- * constructor does not return until the timeout expires, the controller
- * terminates and throws an {@link ConnectTimeoutException}
- * </p>
- *
- * @param host
- * the host name/IP
- * @param port
- * the port on the host
- * @param localAddress
- * the local host name/IP to bind the socket to
- * @param localPort
- * the port on the local machine
- * @param params
- * {@link HttpConnectionParams Http connection parameters}
- *
- * @return Socket a new socket
- *
- * @throws IOException
- * if an I/O error occurs while creating the socket
- * @throws UnknownHostException
- * if the IP address of the host cannot be determined
- * @throws ConnectTimeoutException
- * DOCUMENT ME!
- * @throws IllegalArgumentException
- * DOCUMENT ME!
- */
- public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException {
- if (params == null) {
- throw new IllegalArgumentException("Parameters may not be null");
- }
-
- int timeout = params.getConnectionTimeout();
-
- if (timeout == 0) {
- return createSocket(host, port, localAddress, localPort);
- } else {
- // To be eventually deprecated when migrated to Java 1.4 or above
- return ControllerThreadSocketFactory.createSocket(this, host, port, localAddress, localPort, timeout);
- }
- }
-
- /**
- * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int)
- */
- public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
- return getSSLContext().getSocketFactory().createSocket(host, port);
- }
-
- /**
- * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean)
- */
- public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException {
- return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose);
- }
-
- public boolean equals(Object obj) {
- return ((obj != null) && obj.getClass().equals(DefaultX509TrustManager.class));
- }
-
- public int hashCode() {
- return DefaultX509TrustManager.class.hashCode();
- }
-
- private static SSLContext createEasySSLContext() {
- try {
- SSLContext context = SSLContext.getInstance("TLS");
- context.init(null, new TrustManager[] { new DefaultX509TrustManager(null) }, null);
-
- return context;
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new HttpClientError(e.toString());
- }
- }
-
- private SSLContext getSSLContext() {
- if (this.sslcontext == null) {
- this.sslcontext = createEasySSLContext();
- }
-
- return this.sslcontext;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java b/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java
deleted file mode 100644
index 8dedaa6..0000000
--- a/job/src/main/java/com/kylinolap/job/tools/DefaultX509TrustManager.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.tools;
-
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509TrustManager;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xduo
- *
- */
-public class DefaultX509TrustManager implements X509TrustManager {
-
- /** Log object for this class. */
- private static Logger LOG = LoggerFactory.getLogger(DefaultX509TrustManager.class);
- private X509TrustManager standardTrustManager = null;
-
- /**
- * Constructor for DefaultX509TrustManager.
- *
- */
- public DefaultX509TrustManager(KeyStore keystore) throws NoSuchAlgorithmException, KeyStoreException {
- super();
-
- TrustManagerFactory factory = TrustManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- factory.init(keystore);
-
- TrustManager[] trustmanagers = factory.getTrustManagers();
-
- if (trustmanagers.length == 0) {
- throw new NoSuchAlgorithmException("SunX509 trust manager not supported");
- }
-
- this.standardTrustManager = (X509TrustManager) trustmanagers[0];
- }
-
- public X509Certificate[] getAcceptedIssuers() {
- return this.standardTrustManager.getAcceptedIssuers();
- }
-
- public boolean isClientTrusted(X509Certificate[] certificates) {
- return true;
- // return this.standardTrustManager.isClientTrusted(certificates);
- }
-
- public boolean isServerTrusted(X509Certificate[] certificates) {
- if ((certificates != null) && LOG.isDebugEnabled()) {
- LOG.debug("Server certificate chain:");
-
- for (int i = 0; i < certificates.length; i++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("X509Certificate[" + i + "]=" + certificates[i]);
- }
- }
- }
-
- if ((certificates != null) && (certificates.length == 1)) {
- X509Certificate certificate = certificates[0];
-
- try {
- certificate.checkValidity();
- } catch (CertificateException e) {
- LOG.error(e.toString());
-
- return false;
- }
-
- return true;
- } else {
- return true;
- // return this.standardTrustManager.isServerTrusted(certificates);
- }
- }
-
- @Override
- public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
- // TODO Auto-generated method stub
-
- }
-
-}