You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by lu...@apache.org on 2015/01/07 15:46:40 UTC
[19/51] [partial] incubator-kylin git commit: migrate repo from
github.com to apache git
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java
new file mode 100644
index 0000000..627c397
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.cube;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.BytesUtil;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+import com.kylinolap.cube.common.RowKeySplitter;
+import com.kylinolap.cube.common.SplittedBytes;
+import com.kylinolap.cube.cuboid.Cuboid;
+import com.kylinolap.cube.kv.RowConstants;
+import com.kylinolap.dict.Dictionary;
+import com.kylinolap.dict.DictionaryManager;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+import com.kylinolap.metadata.model.cube.TblColRef;
+
+/**
+ * @author ysong1, honma
+ */
+public class MergeCuboidMapper extends Mapper<Text, Text, Text, Text> {
+
+ private KylinConfig config;
+ private String cubeName;
+ private String segmentName;
+ private CubeManager cubeManager;
+ private CubeInstance cube;
+ private CubeDesc cubeDesc;
+ private CubeSegment mergedCubeSegment;
+ private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
+ // life cycle
+
+ private Text outputKey = new Text();
+
+ private byte[] newKeyBuf;
+ private RowKeySplitter rowKeySplitter;
+
+ private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+
+ private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
+
+ private Boolean checkNeedMerging(TblColRef col) throws IOException {
+ Boolean ret = dictsNeedMerging.get(col);
+ if (ret != null)
+ return ret;
+ else {
+ ret = cubeDesc.getRowkey().isUseDictionary(col) && cubeDesc.getFactTable().equalsIgnoreCase((String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc, col, null)[0]);
+ dictsNeedMerging.put(col, ret);
+ return ret;
+ }
+ }
+
+ private String extractJobIDFromPath(String path) {
+ Matcher matcher = JOB_NAME_PATTERN.matcher(path);
+ // check the first occurance
+ if (matcher.find()) {
+ return matcher.group(1);
+ } else {
+ throw new IllegalStateException("Can not extract job ID from file path : " + path);
+ }
+ }
+
+ private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
+ for (CubeSegment segment : cubeInstance.getSegments()) {
+ if (segment.getUuid().equalsIgnoreCase(jobID)) {
+ return segment;
+ }
+ }
+
+ throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
+
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+
+ config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+
+ cubeManager = CubeManager.getInstance(config);
+ cube = cubeManager.getCube(cubeName);
+ cubeDesc = cube.getDescriptor();
+ mergedCubeSegment = cube.getSegment(segmentName, CubeSegmentStatusEnum.NEW);
+
+ // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+ newKeyBuf = new byte[256];// size will auto-grow
+
+ // decide which source segment
+ org.apache.hadoop.mapreduce.InputSplit inputSplit = context.getInputSplit();
+ String filePath = ((FileSplit) inputSplit).getPath().toString();
+ String jobID = extractJobIDFromPath(filePath);
+ sourceCubeSegment = findSegmentWithUuid(jobID, cube);
+
+ this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+ }
+
+ @Override
+ public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ long cuboidID = rowKeySplitter.split(key.getBytes(), key.getBytes().length);
+ Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
+
+ SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
+ int bufOffset = 0;
+ BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
+ bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+ for (int i = 0; i < cuboid.getColumns().size(); ++i) {
+ TblColRef col = cuboid.getColumns().get(i);
+
+ if (this.checkNeedMerging(col)) {
+ // if dictionary on fact table column, needs rewrite
+ DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+ Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+ Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+
+ while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
+ byte[] oldBuf = newKeyBuf;
+ newKeyBuf = new byte[2 * newKeyBuf.length];
+ System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+ }
+
+ int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
+ int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
+ int idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
+ BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
+
+ bufOffset += mergedDict.getSizeOfId();
+ } else {
+ // keep as it is
+ while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
+ byte[] oldBuf = newKeyBuf;
+ newKeyBuf = new byte[2 * newKeyBuf.length];
+ System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+ }
+
+ System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
+ bufOffset += splittedByteses[i + 1].length;
+ }
+ }
+ byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
+ outputKey.set(newKey, 0, newKey.length);
+
+ context.write(outputKey, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidJob.java
new file mode 100644
index 0000000..e06ef46
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidJob.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.cube;
+
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+
+public class NDCuboidJob extends CuboidJob {
+
+ public NDCuboidJob() {
+ this.setMapperClass(NDCuboidMapper.class);
+ }
+
+ public static void main(String[] args) throws Exception {
+ CuboidJob job = new NDCuboidJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidMapper.java
new file mode 100644
index 0000000..b1f08b0
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidMapper.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.cube;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+import com.kylinolap.cube.common.RowKeySplitter;
+import com.kylinolap.cube.common.SplittedBytes;
+import com.kylinolap.cube.cuboid.Cuboid;
+import com.kylinolap.cube.cuboid.CuboidScheduler;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+public class NDCuboidMapper extends Mapper<Text, Text, Text, Text> {
+
+ private static final Logger logger = LoggerFactory.getLogger(NDCuboidMapper.class);
+
+ private Text outputKey = new Text();
+ private String cubeName;
+ private String segmentName;
+ private CubeDesc cubeDesc;
+ private CuboidScheduler cuboidScheduler;
+
+ private int handleCounter;
+ private int skipCounter;
+
+ private byte[] keyBuf = new byte[4096];
+ private RowKeySplitter rowKeySplitter;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+
+ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ CubeSegment cubeSegment = cube.getSegment(segmentName, CubeSegmentStatusEnum.NEW);
+ cubeDesc = cube.getDescriptor();
+
+ // initialize CubiodScheduler
+ cuboidScheduler = new CuboidScheduler(cubeDesc);
+
+ rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
+ }
+
+ private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
+ int offset = 0;
+
+ // cuboid id
+ System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length);
+ offset += childCuboid.getBytes().length;
+
+ // rowkey columns
+ long mask = Long.highestOneBit(parentCuboid.getId());
+ long parentCuboidId = parentCuboid.getId();
+ long childCuboidId = childCuboid.getId();
+ long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
+ int index = 1; // skip cuboidId
+ for (int i = 0; i < parentCuboidIdActualLength; i++) {
+ if ((mask & parentCuboidId) > 0) {// if the this bit position equals
+ // 1
+ if ((mask & childCuboidId) > 0) {// if the child cuboid has this
+ // column
+ System.arraycopy(splitBuffers[index].value, 0, keyBuf, offset, splitBuffers[index].length);
+ offset += splitBuffers[index].length;
+ }
+ index++;
+ }
+ mask = mask >> 1;
+ }
+
+ return offset;
+ }
+
+ @Override
+ public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ long cuboidId = rowKeySplitter.split(key.getBytes(), key.getLength());
+ Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
+
+ Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
+
+ // if still empty or null
+ if (myChildren == null || myChildren.size() == 0) {
+ context.getCounter(BatchConstants.MAPREDUCE_COUTNER_GROUP_NAME, "Skipped records").increment(1L);
+ skipCounter++;
+ if (skipCounter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Skipped " + skipCounter + " records!");
+ }
+ return;
+ }
+
+ context.getCounter(BatchConstants.MAPREDUCE_COUTNER_GROUP_NAME, "Processed records").increment(1L);
+
+ handleCounter++;
+ if (handleCounter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Handled " + handleCounter + " records!");
+ }
+
+ for (Long child : myChildren) {
+ Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
+ int keyLength = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
+ outputKey.set(keyBuf, 0, keyLength);
+ context.write(outputKey, value);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/NewBaseCuboidMapper.java
new file mode 100644
index 0000000..03ea2a0
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/NewBaseCuboidMapper.java
@@ -0,0 +1,342 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.cube;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.Array;
+import com.kylinolap.common.util.ByteArray;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+import com.kylinolap.cube.common.BytesSplitter;
+import com.kylinolap.cube.common.SplittedBytes;
+import com.kylinolap.cube.cuboid.Cuboid;
+import com.kylinolap.cube.kv.AbstractRowKeyEncoder;
+import com.kylinolap.cube.kv.RowConstants;
+import com.kylinolap.cube.measure.MeasureCodec;
+import com.kylinolap.dict.lookup.HiveTable;
+import com.kylinolap.dict.lookup.LookupBytesTable;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.metadata.MetadataManager;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+import com.kylinolap.metadata.model.cube.DimensionDesc;
+import com.kylinolap.metadata.model.cube.FunctionDesc;
+import com.kylinolap.metadata.model.cube.JoinDesc;
+import com.kylinolap.metadata.model.cube.MeasureDesc;
+import com.kylinolap.metadata.model.cube.ParameterDesc;
+import com.kylinolap.metadata.model.cube.TblColRef;
+import com.kylinolap.metadata.model.schema.TableDesc;
+
+/**
+ * @author George Song (ysong1),honma
+ */
+public class NewBaseCuboidMapper<KEYIN> extends Mapper<KEYIN, Text, Text, Text> {
+
+ private static final Logger logger = LoggerFactory.getLogger(NewBaseCuboidMapper.class);
+
+ private String cubeName;
+ private String segmentName;
+ private Cuboid baseCuboid;
+ private CubeInstance cube;
+ private CubeSegment cubeSegment;
+
+ private CubeDesc cubeDesc;
+ private MetadataManager metadataManager;
+ private TableDesc factTableDesc;
+
+ private boolean byteRowDelimiterInferred = false;
+ private byte byteRowDelimiter;
+
+ private int counter;
+ private Text outputKey = new Text();
+ private Text outputValue = new Text();
+ private Object[] measures;
+ private byte[][] keyBytesBuf;
+ private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+ private BytesSplitter bytesSplitter;
+ private AbstractRowKeyEncoder rowKeyEncoder;
+ private MeasureCodec measureCodec;
+
+ // deal with table join
+ private HashMap<String, LookupBytesTable> lookupTables;// name -> table
+ private LinkedList<TableJoin> tableJoins;
+ private LinkedList<Pair<Integer, Integer>> factTblColAsRowKey;// similar as
+ // TableJoin.dimTblColAsRowKey
+ private int[][] measureColumnIndice;
+ private byte[] nullValue;
+
+ private class TableJoin {
+ public LinkedList<Integer> fkIndice;// zero-based join columns on fact
+ // table
+ public String lookupTableName;
+ public String joinType;
+
+ // Pair.first -> zero-based column index in lookup table
+ // Pair.second -> zero based row key index
+ public LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey;
+
+ private TableJoin(String joinType, LinkedList<Integer> fkIndice, String lookupTableName, LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey) {
+ this.joinType = joinType;
+ this.fkIndice = fkIndice;
+ this.lookupTableName = lookupTableName;
+ this.dimTblColAsRowKey = dimTblColAsRowKey;
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+
+ metadataManager = MetadataManager.getInstance(config);
+ cube = CubeManager.getInstance(config).getCube(cubeName);
+ cubeSegment = cube.getSegment(segmentName, CubeSegmentStatusEnum.NEW);
+ cubeDesc = cube.getDescriptor();
+ factTableDesc = metadataManager.getTableDesc(cubeDesc.getFactTable());
+
+ long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+
+ // intermediateTableDesc = new
+ // JoinedFlatTableDesc(cube.getDescriptor());
+
+ rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
+
+ measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+ measures = new Object[cubeDesc.getMeasures().size()];
+
+ int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+ keyBytesBuf = new byte[colCount][];
+
+ bytesSplitter = new BytesSplitter(factTableDesc.getColumns().length, 4096);
+
+ nullValue = new byte[] { (byte) '\\', (byte) 'N' };// As in Hive, null
+ // value is
+ // represented by \N
+
+ prepareJoins();
+ prepareMetrics();
+ }
+
+ private void prepareJoins() throws IOException {
+ this.lookupTables = new HashMap<String, LookupBytesTable>();
+ this.tableJoins = new LinkedList<TableJoin>();
+ this.factTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>();
+
+ for (DimensionDesc dim : cubeDesc.getDimensions()) {
+ JoinDesc join = dim.getJoin();
+ if (join != null) {
+ String joinType = join.getType().toUpperCase();
+ String lookupTableName = dim.getTable();
+
+ // load lookup tables
+ if (!lookupTables.containsKey(lookupTableName)) {
+ HiveTable htable = new HiveTable(metadataManager, lookupTableName);
+ LookupBytesTable btable = new LookupBytesTable(metadataManager.getTableDesc(lookupTableName), join.getPrimaryKey(), htable);
+ lookupTables.put(lookupTableName, btable);
+ }
+
+ // create join infos
+ LinkedList<Integer> fkIndice = new LinkedList<Integer>();
+ for (TblColRef colRef : join.getForeignKeyColumns()) {
+ fkIndice.add(colRef.getColumn().getZeroBasedIndex());
+ }
+ this.tableJoins.add(new TableJoin(joinType, fkIndice, lookupTableName, this.findColumnRowKeyRelationships(dim)));
+
+ } else {
+
+ this.factTblColAsRowKey.addAll(this.findColumnRowKeyRelationships(dim));
+ }
+ }
+
+ // put composite keys joins ahead of single key joins
+ Collections.sort(tableJoins, new Comparator<TableJoin>() {
+ @Override
+ public int compare(TableJoin o1, TableJoin o2) {
+ return Integer.valueOf(o2.fkIndice.size()).compareTo(Integer.valueOf(o1.fkIndice.size()));
+ }
+ });
+ }
+
+ private LinkedList<Pair<Integer, Integer>> findColumnRowKeyRelationships(DimensionDesc dim) {
+ LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>();
+ for (TblColRef colRef : dim.getColumnRefs()) {
+ int dimTableIndex = colRef.getColumn().getZeroBasedIndex();
+ int rowKeyIndex = cubeDesc.getRowkey().getRowKeyIndexByColumnName(colRef.getName());
+ dimTblColAsRowKey.add(new Pair<Integer, Integer>(dimTableIndex, rowKeyIndex));
+ }
+ return dimTblColAsRowKey;
+ }
+
+ private void prepareMetrics() {
+ List<MeasureDesc> measures = cubeDesc.getMeasures();
+ int measureSize = measures.size();
+ measureColumnIndice = new int[measureSize][];
+ for (int i = 0; i < measureSize; i++) {
+ FunctionDesc func = measures.get(i).getFunction();
+ List<TblColRef> colRefs = func.getParameter().getColRefs();
+ if (colRefs == null) {
+ measureColumnIndice[i] = null;
+ } else {
+ measureColumnIndice[i] = new int[colRefs.size()];
+ for (int j = 0; j < colRefs.size(); j++) {
+ TblColRef c = colRefs.get(j);
+ int factTblIdx = factTableDesc.findColumnByName(c.getName()).getZeroBasedIndex();
+ measureColumnIndice[i][j] = factTblIdx;
+ }
+ }
+ }
+ }
+
+ private byte[] trimSplitBuffer(SplittedBytes splittedBytes) {
+ return Arrays.copyOf(splittedBytes.value, splittedBytes.length);
+ }
+
+ private byte[] buildKey(SplittedBytes[] splitBuffers) {
+
+ int filledDimension = 0;// debug
+
+ // join lookup tables, and fill into RowKey the columns in lookup table
+ for (TableJoin tableJoin : this.tableJoins) {
+ String dimTblName = tableJoin.lookupTableName;
+ LookupBytesTable dimTbl = this.lookupTables.get(dimTblName);
+ ByteArray[] rawKey = new ByteArray[tableJoin.fkIndice.size()];
+ for (int i = 0; i < tableJoin.fkIndice.size(); ++i) {
+ rawKey[i] = new ByteArray(trimSplitBuffer(splitBuffers[tableJoin.fkIndice.get(i)]));
+ }
+ Array<ByteArray> key = new Array<ByteArray>(rawKey);
+ ByteArray[] dimRow = dimTbl.getRow(key);
+ if (dimRow == null) {
+ if (tableJoin.joinType.equalsIgnoreCase("INNER")) {
+ return null;
+ } else if (tableJoin.joinType.equalsIgnoreCase("LEFT")) {
+ for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
+ keyBytesBuf[relation.getSecond()] = nullValue;
+ filledDimension++;
+ }
+ }
+ } else {
+ for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
+ keyBytesBuf[relation.getSecond()] = dimRow[relation.getFirst()].data;
+ filledDimension++;
+ }
+ }
+ }
+
+ // fill into RowKey the columns in fact table
+ for (Pair<Integer, Integer> relation : this.factTblColAsRowKey) {
+ keyBytesBuf[relation.getSecond()] = trimSplitBuffer(splitBuffers[relation.getFirst()]);
+ filledDimension++;
+ }
+
+ assert filledDimension == keyBytesBuf.length;
+
+ // all the row key slots(keyBytesBuf) should be complete now
+ return rowKeyEncoder.encode(keyBytesBuf);
+ }
+
+ private void buildValue(SplittedBytes[] splitBuffers) {
+
+ for (int i = 0; i < measures.length; i++) {
+ byte[] valueBytes = getValueBytes(splitBuffers, i);
+ measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
+ }
+
+ valueBuf.clear();
+ measureCodec.encode(measures, valueBuf);
+ }
+
+ private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
+ MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
+ ParameterDesc paramDesc = desc.getFunction().getParameter();
+ int[] flatTableIdx = this.measureColumnIndice[measureIdx];
+
+ byte[] result = null;
+
+ // constant
+ if (flatTableIdx == null) {
+ result = Bytes.toBytes(paramDesc.getValue());
+ }
+ // column values
+ else {
+ for (int i = 0; i < flatTableIdx.length; i++) {
+ SplittedBytes split = splitBuffers[flatTableIdx[i]];
+ result = Arrays.copyOf(split.value, split.length);
+ }
+ }
+
+ if (desc.getFunction().isCount()) {
+ result = Bytes.toBytes("1");
+ }
+
+ return result;
+ }
+
+ @Override
+ public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
+ // combining the hive table flattening logic into base cuboid building.
+ // the input of this mapper is the fact table rows
+
+ counter++;
+ if (counter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Handled " + counter + " records!");
+ }
+
+ if (!byteRowDelimiterInferred)
+ byteRowDelimiter = bytesSplitter.inferByteRowDelimiter(value.getBytes(), value.getLength(), factTableDesc.getColumns().length);
+
+ bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
+
+ try {
+ byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
+ if (rowKey == null)
+ return;// skip this fact table row
+
+ outputKey.set(rowKey, 0, rowKey.length);
+
+ buildValue(bytesSplitter.getSplitBuffers());
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+
+ context.write(outputKey, outputValue);
+
+ } catch (Throwable t) {
+ logger.error("", t);
+ context.getCounter(BatchConstants.MAPREDUCE_COUTNER_GROUP_NAME, "Error records").increment(1L);
+ return;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionJob.java
new file mode 100644
index 0000000..8245266
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionJob.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.cube;
+
+import java.io.File;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.metadata.model.cube.CubeDesc.CubeCapacity;
+
+/**
+ * @author xjiang, ysong1
+ *
+ */
+
+public class RangeKeyDistributionJob extends AbstractHadoopJob {
+ protected static final Logger log = LoggerFactory.getLogger(RangeKeyDistributionJob.class);
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+
+ parseOptions(options, args);
+
+ // start job
+ String jobName = getOptionValue(OPTION_JOB_NAME);
+ job = Job.getInstance(getConf(), jobName);
+
+ File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+ if (JarFile.exists()) {
+ job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
+ } else {
+ job.setJarByClass(this.getClass());
+ }
+
+ addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ FileOutputFormat.setOutputPath(job, output);
+ // job.getConfiguration().set("dfs.block.size", "67108864");
+
+ // Mapper
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(RangeKeyDistributionMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+
+ // Reducer - only one
+ job.setReducerClass(RangeKeyDistributionReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
+ job.setNumReduceTasks(1);
+
+ this.deletePath(job.getConfiguration(), output);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeCapacity cubeCapacity = cube.getDescriptor().getCapacity();
+ job.getConfiguration().set(BatchConstants.CUBE_CAPACITY, cubeCapacity.toString());
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ printUsage(options);
+ log.error(e.getLocalizedMessage(), e);
+ return 2;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionMapper.java
new file mode 100644
index 0000000..f02ae1a
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionMapper.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.cube;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RangeKeyDistributionMapper extends Mapper<Text, Text, Text, LongWritable> {
+
+ private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L;
+
+ private LongWritable outputValue = new LongWritable(0);
+
+ private long bytesRead = 0;
+
+ private Text lastKey;
+
+ @Override
+ public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ lastKey = key;
+
+ int bytesLength = key.getLength() + value.getLength();
+ bytesRead += bytesLength;
+
+ if (bytesRead >= ONE_MEGA_BYTES) {
+ outputValue.set(bytesRead);
+ context.write(key, outputValue);
+
+ // reset bytesRead
+ bytesRead = 0;
+ }
+
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ if (lastKey != null) {
+ outputValue.set(bytesRead);
+ context.write(lastKey, outputValue);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
new file mode 100644
index 0000000..dafea36
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.cube;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
+
+import com.kylinolap.job.constant.BatchConstants;
+import com.kylinolap.metadata.model.cube.CubeDesc.CubeCapacity;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RangeKeyDistributionReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
+
+ public static final long FIVE_GIGA_BYTES = 5L * 1024L * 1024L * 1024L;
+ public static final long TEN_GIGA_BYTES = 10L * 1024L * 1024L * 1024L;
+ public static final long TWENTY_GIGA_BYTES = 20L * 1024L * 1024L * 1024L;
+
+ private LongWritable outputValue = new LongWritable(0);
+
+ private long bytesRead = 0;
+ private Text lastKey;
+
+ private CubeCapacity cubeCapacity;
+ private long cut;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ cubeCapacity = CubeCapacity.valueOf(context.getConfiguration().get(BatchConstants.CUBE_CAPACITY));
+ switch (cubeCapacity) {
+ case SMALL:
+ cut = FIVE_GIGA_BYTES;
+ break;
+ case MEDIUM:
+ cut = TEN_GIGA_BYTES;
+ break;
+ case LARGE:
+ cut = TWENTY_GIGA_BYTES;
+ break;
+ }
+ }
+
+ @Override
+ public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+ lastKey = key;
+ long length = 0;
+ for (LongWritable v : values) {
+ length += v.get();
+ }
+
+ bytesRead += length;
+
+ if (bytesRead >= cut) {
+ outputValue.set(bytesRead);
+ context.write(key, outputValue);
+ System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
+ // reset bytesRead
+ bytesRead = 0;
+ }
+
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ if (lastKey != null) {
+ outputValue.set(bytesRead);
+ context.write(lastKey, outputValue);
+ System.out.println(StringUtils.byteToHexString(lastKey.getBytes()) + "\t" + outputValue.get());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerJob.java
new file mode 100644
index 0000000..718c188
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerJob.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.cube;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RowKeyDistributionCheckerJob extends AbstractHadoopJob {
+
+ @SuppressWarnings("static-access")
+ protected static final Option rowKeyStatsFilePath = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath");
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(rowKeyStatsFilePath);
+
+ parseOptions(options, args);
+
+ String statsFilePath = getOptionValue(rowKeyStatsFilePath);
+
+ // start job
+ String jobName = getOptionValue(OPTION_JOB_NAME);
+ job = Job.getInstance(getConf(), jobName);
+
+ job.setJarByClass(this.getClass());
+
+ addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ FileOutputFormat.setOutputPath(job, output);
+
+ // Mapper
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(RowKeyDistributionCheckerMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+
+ // Reducer - only one
+ job.setReducerClass(RowKeyDistributionCheckerReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
+ job.setNumReduceTasks(1);
+
+ job.getConfiguration().set("rowKeyStatsFilePath", statsFilePath);
+
+ this.deletePath(job.getConfiguration(), output);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ printUsage(options);
+ log.error(e.getLocalizedMessage(), e);
+ return 2;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new RowKeyDistributionCheckerJob(), args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
new file mode 100644
index 0000000..76e3f37
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.cube;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RowKeyDistributionCheckerMapper extends Mapper<Text, Text, Text, LongWritable> {
+
+ String rowKeyStatsFilePath;
+ byte[][] splitKeys;
+ Map<Text, Long> resultMap;
+ List<Text> keyList;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ rowKeyStatsFilePath = context.getConfiguration().get("rowKeyStatsFilePath");
+ splitKeys = this.getSplits(context.getConfiguration(), new Path(rowKeyStatsFilePath));
+
+ resultMap = new HashMap<Text, Long>();
+ keyList = new ArrayList<Text>();
+ for (int i = 0; i < splitKeys.length; i++) {
+ Text key = new Text(splitKeys[i]);
+ resultMap.put(key, 0L);
+ keyList.add(new Text(splitKeys[i]));
+ }
+ }
+
+ @Override
+ public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ for (Text t : keyList) {
+ if (key.compareTo(t) < 0) {
+ Long v = resultMap.get(t);
+ long length = key.getLength() + value.getLength();
+ v += length;
+ resultMap.put(t, v);
+ break;
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ LongWritable outputValue = new LongWritable();
+ for (Entry<Text, Long> kv : resultMap.entrySet()) {
+ outputValue.set(kv.getValue());
+ context.write(kv.getKey(), outputValue);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ public byte[][] getSplits(Configuration conf, Path path) {
+ List<byte[]> rowkeyList = new ArrayList<byte[]>();
+ SequenceFile.Reader reader = null;
+ try {
+ reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ while (reader.next(key, value)) {
+ byte[] tmp = ((Text) key).copyBytes();
+ if (rowkeyList.contains(tmp) == false) {
+ rowkeyList.add(tmp);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+
+ byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
+
+ return retValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
new file mode 100644
index 0000000..742f644
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.cube;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * @author ysong1
+ *
+ */
+public class RowKeyDistributionCheckerReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
+
+ LongWritable outputKey = new LongWritable(0L);
+
+ @Override
+ public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+
+ long length = 0;
+ for (LongWritable v : values) {
+ length += v.get();
+ }
+
+ outputKey.set(length);
+ context.write(key, outputKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java
new file mode 100644
index 0000000..51d893c
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.cube;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.job.JobDAO;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.constant.JobStatusEnum;
+import com.kylinolap.job.engine.JobEngineConfig;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author ysong1
+ */
+public class StorageCleanupJob extends AbstractHadoopJob {
+
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
+
+ protected static final Logger log = LoggerFactory.getLogger(StorageCleanupJob.class);
+
+ boolean delete = false;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ try {
+ options.addOption(OPTION_DELETE);
+ parseOptions(options, args);
+
+ delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
+
+ Configuration conf = HBaseConfiguration.create(getConf());
+
+ cleanUnusedHdfsFiles(conf);
+ cleanUnusedHBaseTables(conf);
+ cleanUnusedIntermediateHiveTable(conf);
+
+ return 0;
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ log.error(e.getLocalizedMessage(), e);
+ return 2;
+ }
+ }
+
+ private boolean isJobInUse(JobInstance job) {
+ if (job.getStatus().equals(JobStatusEnum.NEW) || job.getStatus().equals(JobStatusEnum.PENDING) || job.getStatus().equals(JobStatusEnum.RUNNING) || job.getStatus().equals(JobStatusEnum.ERROR)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+
+ private void cleanUnusedHBaseTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ // get all kylin hbase tables
+ HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+ String tableNamePrefix = CubeManager.getHBaseStorageLocationPrefix();
+ HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
+ List<String> allTablesNeedToBeDropped = new ArrayList<String>();
+ for (HTableDescriptor desc : tableDescriptors) {
+ String host = desc.getValue(CubeManager.getHtableMetadataKey());
+ if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) {
+ //only take care htables that belongs to self
+ allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
+ }
+ }
+
+ // remove every segment htable from drop list
+ for (CubeInstance cube : cubeMgr.listAllCubes()) {
+ for (CubeSegment seg : cube.getSegments()) {
+ String tablename = seg.getStorageLocationIdentifier();
+ allTablesNeedToBeDropped.remove(tablename);
+ log.info("Remove table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus());
+ }
+ }
+
+ if (delete == true) {
+ // drop tables
+ for (String htableName : allTablesNeedToBeDropped) {
+ log.info("Deleting HBase table " + htableName);
+ if (hbaseAdmin.tableExists(htableName)) {
+ hbaseAdmin.disableTable(htableName);
+ hbaseAdmin.deleteTable(htableName);
+ log.info("Deleted HBase table " + htableName);
+ } else {
+ log.info("HBase table" + htableName + " does not exist");
+ }
+ }
+ } else {
+ System.out.println("--------------- Tables To Be Dropped ---------------");
+ for (String htableName : allTablesNeedToBeDropped) {
+ System.out.println(htableName);
+ }
+ System.out.println("----------------------------------------------------");
+ }
+
+ hbaseAdmin.close();
+ }
+
+ private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
+ JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ FileSystem fs = FileSystem.get(conf);
+ List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
+ // GlobFilter filter = new
+ // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+ // + "/kylin-.*");
+ FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
+ for (FileStatus status : fStatus) {
+ String path = status.getPath().getName();
+ // System.out.println(path);
+ if (path.startsWith(JobInstance.JOB_WORKING_DIR_PREFIX)) {
+ String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + "/" + path;
+ allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
+ }
+ }
+
+ List<JobInstance> allJobs = JobDAO.getInstance(KylinConfig.getInstanceFromEnv()).listAllJobs();
+ for (JobInstance jobInstance : allJobs) {
+ // only remove FINISHED and DISCARDED job intermediate files
+ if (isJobInUse(jobInstance) == true) {
+ String path = JobInstance.getJobWorkingDir(jobInstance, engineConfig);
+ allHdfsPathsNeedToBeDeleted.remove(path);
+ log.info("Remove " + path + " from deletion list, as the path belongs to job " + jobInstance.getUuid() + " with status " + jobInstance.getStatus());
+ }
+ }
+
+ // remove every segment working dir from deletion list
+ for (CubeInstance cube : cubeMgr.listAllCubes()) {
+ for (CubeSegment seg : cube.getSegments()) {
+ String jobUuid = seg.getLastBuildJobID();
+ if (jobUuid != null && jobUuid.equals("") == false) {
+ String path = JobInstance.getJobWorkingDir(jobUuid, engineConfig.getHdfsWorkingDirectory());
+ allHdfsPathsNeedToBeDeleted.remove(path);
+ log.info("Remove " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName());
+ }
+ }
+ }
+
+ if (delete == true) {
+ // remove files
+ for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
+ log.info("Deleting hdfs path " + hdfsPath);
+ Path p = new Path(hdfsPath);
+ if (fs.exists(p) == true) {
+ fs.delete(p, true);
+ log.info("Deleted hdfs path " + hdfsPath);
+ } else {
+ log.info("Hdfs path " + hdfsPath + "does not exist");
+ }
+ }
+ } else {
+ System.out.println("--------------- HDFS Path To Be Deleted ---------------");
+ for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
+ System.out.println(hdfsPath);
+ }
+ System.out.println("-------------------------------------------------------");
+ }
+
+ }
+
+ private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException {
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateDictionaryJob.java b/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateDictionaryJob.java
new file mode 100644
index 0000000..0ee1811
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateDictionaryJob.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.dict;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.cli.DictionaryGeneratorCLI;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author ysong1
+ *
+ */
+
+public class CreateDictionaryJob extends AbstractHadoopJob {
+
+ private int returnCode = 0;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ parseOptions(options, args);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+ String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
+
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+ DictionaryGeneratorCLI.processSegment(config, cubeName, segmentName, factColumnsInputPath);
+ } catch (Exception e) {
+ printUsage(options);
+ e.printStackTrace(System.err);
+ log.error(e.getLocalizedMessage(), e);
+ returnCode = 2;
+ }
+
+ return returnCode;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new CreateDictionaryJob(), args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java b/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
new file mode 100644
index 0000000..d75a4a9
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.dict;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+
+/**
+ * @author ysong1
+ *
+ */
+public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ parseOptions(options, args);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+ CubeManager mgr = CubeManager.getInstance(config);
+ CubeInstance cube = mgr.getCube(cubeName);
+ if (cube == null || cube.isInvertedIndex() == false)
+ throw new IllegalArgumentException("No Inverted Index Cube found by name " + cubeName);
+
+ mgr.buildInvertedIndexDictionary(cube.getFirstSegment(), factColumnsInputPath);
+ return 0;
+ } catch (Exception e) {
+ printUsage(options);
+ e.printStackTrace(System.err);
+ log.error(e.getLocalizedMessage(), e);
+ return 2;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/hbase/BulkLoadJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/hbase/BulkLoadJob.java b/job/src/main/java/com/kylinolap/job/hadoop/hbase/BulkLoadJob.java
new file mode 100644
index 0000000..1037c70
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/hbase/BulkLoadJob.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.hbase;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+import com.kylinolap.metadata.model.cube.HBaseColumnFamilyDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author ysong1
+ *
+ */
+public class BulkLoadJob extends AbstractHadoopJob {
+
+ protected static final Logger log = LoggerFactory.getLogger(BulkLoadJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_HTABLE_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ parseOptions(options, args);
+
+ String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
+ // e.g
+ // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
+ // end with "/"
+ String input = getOptionValue(OPTION_INPUT_PATH);
+
+ Configuration conf = HBaseConfiguration.create(getConf());
+ FileSystem fs = FileSystem.get(conf);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeDesc cubeDesc = cube.getDescriptor();
+ FsPermission permission = new FsPermission((short) 0777);
+ for (HBaseColumnFamilyDesc cf : cubeDesc.getHBaseMapping().getColumnFamily()) {
+ String cfName = cf.getName();
+ fs.setPermission(new Path(input + cfName), permission);
+ }
+
+ String[] newArgs = new String[2];
+ newArgs[0] = input;
+ newArgs[1] = tableName;
+
+ log.debug("Start to run LoadIncrementalHFiles");
+ int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), newArgs);
+ log.debug("End to run LoadIncrementalHFiles");
+ return ret;
+ } catch (Exception e) {
+ printUsage(options);
+ e.printStackTrace(System.err);
+ log.error(e.getLocalizedMessage(), e);
+ return 2;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new BulkLoadJob(), args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
new file mode 100644
index 0000000..4cb20cb
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.HadoopUtil;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.job.hadoop.AbstractHadoopJob;
+import com.kylinolap.job.tools.DeployCoprocessorCLI;
+import com.kylinolap.job.tools.LZOSupportnessChecker;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+import com.kylinolap.metadata.model.cube.HBaseColumnFamilyDesc;
+
+/**
+ * @author George Song (ysong1)
+ */
+
+public class CreateHTableJob extends AbstractHadoopJob {
+
+ protected static final Logger log = LoggerFactory.getLogger(CreateHTableJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_PARTITION_FILE_PATH);
+ options.addOption(OPTION_HTABLE_NAME);
+ parseOptions(options, args);
+
+ Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeDesc cubeDesc = cube.getDescriptor();
+
+ String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
+ HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+ // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
+ tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
+ tableDesc.setValue(CubeManager.getHtableMetadataKey(),config.getMetadataUrlPrefix());
+
+ Configuration conf = HBaseConfiguration.create(getConf());
+ HBaseAdmin admin = new HBaseAdmin(conf);
+
+ try {
+ if (User.isHBaseSecurityEnabled(conf)) {
+ // add coprocessor for bulk load
+ tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+ }
+
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+ HColumnDescriptor cf = new HColumnDescriptor(cfDesc.getName());
+ cf.setMaxVersions(1);
+
+ if (LZOSupportnessChecker.getSupportness()) {
+ log.info("hbase will use lzo to compress data");
+ cf.setCompressionType(Algorithm.LZO);
+ } else {
+ log.info("hbase will not use lzo to compress data");
+ }
+
+ cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+ cf.setInMemory(false);
+ cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
+ tableDesc.addFamily(cf);
+ }
+
+ byte[][] splitKeys = getSplits(conf, partitionFilePath);
+
+ if (admin.tableExists(tableName)) {
+ // admin.disableTable(tableName);
+ // admin.deleteTable(tableName);
+ throw new RuntimeException("HBase table " + tableName + " exists!");
+ }
+
+ try {
+ initHTableCoprocessor(tableDesc);
+ log.info("hbase table " + tableName + " deployed with coprocessor.");
+
+ } catch (Exception ex) {
+ log.error("Error deploying coprocessor on " + tableName, ex);
+ log.error("Will try creating the table without coprocessor.");
+ }
+
+ admin.createTable(tableDesc, splitKeys);
+ log.info("create hbase table " + tableName + " done.");
+
+ return 0;
+ } catch (Exception e) {
+ printUsage(options);
+ e.printStackTrace(System.err);
+ log.error(e.getLocalizedMessage(), e);
+ return 2;
+ } finally {
+ admin.close();
+ }
+ }
+
+ private void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ Configuration hconf = HadoopUtil.getDefaultConfiguration();
+ FileSystem fileSystem = FileSystem.get(hconf);
+
+ String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+ Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
+
+ DeployCoprocessorCLI.setCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+ }
+
+ @SuppressWarnings("deprecation")
+ public byte[][] getSplits(Configuration conf, Path path) throws Exception {
+ List<byte[]> rowkeyList = new ArrayList<byte[]>();
+ SequenceFile.Reader reader = null;
+ try {
+ reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ while (reader.next(key, value)) {
+ byte[] tmp = ((Text) key).copyBytes();
+ if (rowkeyList.contains(tmp) == false) {
+ rowkeyList.add(tmp);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+
+ byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
+ if (retValue.length == 0) {
+ throw new IllegalStateException("Split number is 0, no records in cube??");
+ }
+
+ return retValue;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new CreateHTableJob(), args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableDesc.java b/job/src/main/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableDesc.java
new file mode 100644
index 0000000..2fdce86
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableDesc.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.hive;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.cuboid.Cuboid;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+import com.kylinolap.metadata.model.cube.FunctionDesc;
+import com.kylinolap.metadata.model.cube.MeasureDesc;
+import com.kylinolap.metadata.model.cube.TblColRef;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class JoinedFlatTableDesc {
+
+ private String tableName;
+ private final CubeDesc cubeDesc;
+ private final CubeSegment cubeSegment;
+
+ private int[] rowKeyColumnIndexes; // the column index on flat table
+ private int[][] measureColumnIndexes; // [i] is the i.th measure related
+ // column index on flat table
+
+ public JoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment) {
+ this.cubeDesc = cubeDesc;
+ this.cubeSegment = cubeSegment;
+ parseCubeDesc();
+ }
+
+ /**
+ * @return the cubeSegment
+ */
+ public CubeSegment getCubeSegment() {
+ return cubeSegment;
+ }
+
+ private List<IntermediateColumnDesc> columnList = new ArrayList<IntermediateColumnDesc>();
+
+ public List<IntermediateColumnDesc> getColumnList() {
+ return columnList;
+ }
+
+ // check what columns from hive tables are required, and index them
+ private void parseCubeDesc() {
+ int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+ long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+
+ if (cubeSegment == null) {
+ this.tableName = "kylin_intermediate_" + cubeDesc.getName();
+ } else {
+ this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getName();
+ }
+
+ Map<String, Integer> dimensionIndexMap = new HashMap<String, Integer>();
+ int columnIndex = 0;
+ for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived()) {
+ dimensionIndexMap.put(col.getName(), columnIndex);
+ columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), col.getName(), col.getDatatype(), col.getTable()));
+ columnIndex++;
+ }
+
+ // build index
+ List<TblColRef> cuboidColumns = baseCuboid.getColumns();
+ rowKeyColumnIndexes = new int[rowkeyColCount];
+ for (int i = 0; i < rowkeyColCount; i++) {
+ String colName = cuboidColumns.get(i).getName();
+ Integer dimIdx = dimensionIndexMap.get(colName);
+ if (dimIdx == null) {
+ throw new RuntimeException("Can't find column " + colName);
+ }
+ rowKeyColumnIndexes[i] = dimIdx;
+ }
+
+ List<MeasureDesc> measures = cubeDesc.getMeasures();
+ int measureSize = measures.size();
+ measureColumnIndexes = new int[measureSize][];
+ for (int i = 0; i < measureSize; i++) {
+ FunctionDesc func = measures.get(i).getFunction();
+ List<TblColRef> colRefs = func.getParameter().getColRefs();
+ if (colRefs == null) {
+ measureColumnIndexes[i] = null;
+ } else {
+ measureColumnIndexes[i] = new int[colRefs.size()];
+ for (int j = 0; j < colRefs.size(); j++) {
+ TblColRef c = colRefs.get(j);
+ measureColumnIndexes[i][j] = contains(columnList, c);
+ if (measureColumnIndexes[i][j] < 0) {
+ measureColumnIndexes[i][j] = columnIndex;
+ columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c.getName(), c.getDatatype(), c.getTable()));
+ columnIndex++;
+ }
+ }
+ }
+ }
+ }
+
+ private int contains(List<IntermediateColumnDesc> columnList, TblColRef c) {
+ for (int i = 0; i < columnList.size(); i++) {
+ IntermediateColumnDesc col = columnList.get(i);
+ if (col.getColumnName().equals(c.getName()) && col.getTableName().equals(c.getTable()))
+ return i;
+ }
+ return -1;
+ }
+
+ public CubeDesc getCubeDesc() {
+ return cubeDesc;
+ }
+
+ public String getTableName(String jobUUID) {
+ return tableName + "_" + jobUUID.replace("-", "_");
+ }
+
+ public int[] getRowKeyColumnIndexes() {
+ return rowKeyColumnIndexes;
+ }
+
+ public int[][] getMeasureColumnIndexes() {
+ return measureColumnIndexes;
+ }
+
+ public static class IntermediateColumnDesc {
+ private String id;
+ private String columnName;
+ private String dataType;
+ private String tableName;
+
+ public IntermediateColumnDesc(String id, String columnName, String dataType, String tableName) {
+ this.id = id;
+ this.columnName = columnName;
+ this.dataType = dataType;
+ this.tableName = tableName;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public String getDataType() {
+ return dataType;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/hive/SqlHiveDataTypeMapping.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/hive/SqlHiveDataTypeMapping.java b/job/src/main/java/com/kylinolap/job/hadoop/hive/SqlHiveDataTypeMapping.java
new file mode 100644
index 0000000..294ffae
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/hive/SqlHiveDataTypeMapping.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop.hive;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+public class SqlHiveDataTypeMapping {
+
+ private static final Map<String, String> sqlToHiveDataTypeMapping = new HashMap<String, String>();
+
+ static {
+ sqlToHiveDataTypeMapping.put("short", "smallint");
+ sqlToHiveDataTypeMapping.put("long", "bigint");
+ sqlToHiveDataTypeMapping.put("byte", "tinyint");
+ sqlToHiveDataTypeMapping.put("datetime", "date");
+ }
+
+ public static String getHiveDataType(String javaDataType) {
+ String hiveDataType = sqlToHiveDataTypeMapping.get(javaDataType.toLowerCase());
+ if (hiveDataType == null) {
+ hiveDataType = javaDataType;
+ }
+ return hiveDataType.toLowerCase();
+ }
+}