You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/10 07:48:12 UTC
[19/54] [abbrv] [partial] incubator-kylin git commit: cleanup for
migration from github.com
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java
deleted file mode 100644
index 627c397..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidMapper.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.BytesUtil;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-import com.kylinolap.cube.common.RowKeySplitter;
-import com.kylinolap.cube.common.SplittedBytes;
-import com.kylinolap.cube.cuboid.Cuboid;
-import com.kylinolap.cube.kv.RowConstants;
-import com.kylinolap.dict.Dictionary;
-import com.kylinolap.dict.DictionaryManager;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.TblColRef;
-
-/**
- * @author ysong1, honma
- */
-public class MergeCuboidMapper extends Mapper<Text, Text, Text, Text> {
-
- private KylinConfig config;
- private String cubeName;
- private String segmentName;
- private CubeManager cubeManager;
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private CubeSegment mergedCubeSegment;
- private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
- // life cycle
-
- private Text outputKey = new Text();
-
- private byte[] newKeyBuf;
- private RowKeySplitter rowKeySplitter;
-
- private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
-
- private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
-
- private Boolean checkNeedMerging(TblColRef col) throws IOException {
- Boolean ret = dictsNeedMerging.get(col);
- if (ret != null)
- return ret;
- else {
- ret = cubeDesc.getRowkey().isUseDictionary(col) && cubeDesc.getFactTable().equalsIgnoreCase((String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc, col, null)[0]);
- dictsNeedMerging.put(col, ret);
- return ret;
- }
- }
-
- private String extractJobIDFromPath(String path) {
- Matcher matcher = JOB_NAME_PATTERN.matcher(path);
- // check the first occurance
- if (matcher.find()) {
- return matcher.group(1);
- } else {
- throw new IllegalStateException("Can not extract job ID from file path : " + path);
- }
- }
-
- private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
- for (CubeSegment segment : cubeInstance.getSegments()) {
- if (segment.getUuid().equalsIgnoreCase(jobID)) {
- return segment;
- }
- }
-
- throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
-
- }
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
- config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- cubeManager = CubeManager.getInstance(config);
- cube = cubeManager.getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- mergedCubeSegment = cube.getSegment(segmentName, CubeSegmentStatusEnum.NEW);
-
- // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
- newKeyBuf = new byte[256];// size will auto-grow
-
- // decide which source segment
- org.apache.hadoop.mapreduce.InputSplit inputSplit = context.getInputSplit();
- String filePath = ((FileSplit) inputSplit).getPath().toString();
- String jobID = extractJobIDFromPath(filePath);
- sourceCubeSegment = findSegmentWithUuid(jobID, cube);
-
- this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- long cuboidID = rowKeySplitter.split(key.getBytes(), key.getBytes().length);
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
-
- SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
- int bufOffset = 0;
- BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
- bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
- for (int i = 0; i < cuboid.getColumns().size(); ++i) {
- TblColRef col = cuboid.getColumns().get(i);
-
- if (this.checkNeedMerging(col)) {
- // if dictionary on fact table column, needs rewrite
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
- Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
- while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBuf;
- newKeyBuf = new byte[2 * newKeyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
- }
-
- int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
- int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
- int idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
- BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
-
- bufOffset += mergedDict.getSizeOfId();
- } else {
- // keep as it is
- while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBuf;
- newKeyBuf = new byte[2 * newKeyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
- }
-
- System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
- bufOffset += splittedByteses[i + 1].length;
- }
- }
- byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
- outputKey.set(newKey, 0, newKey.length);
-
- context.write(outputKey, value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidJob.java
deleted file mode 100644
index e06ef46..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidJob.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * @author George Song (ysong1)
- *
- */
-
-public class NDCuboidJob extends CuboidJob {
-
- public NDCuboidJob() {
- this.setMapperClass(NDCuboidMapper.class);
- }
-
- public static void main(String[] args) throws Exception {
- CuboidJob job = new NDCuboidJob();
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidMapper.java
deleted file mode 100644
index b1f08b0..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/NDCuboidMapper.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-import com.kylinolap.cube.common.RowKeySplitter;
-import com.kylinolap.cube.common.SplittedBytes;
-import com.kylinolap.cube.cuboid.Cuboid;
-import com.kylinolap.cube.cuboid.CuboidScheduler;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class NDCuboidMapper extends Mapper<Text, Text, Text, Text> {
-
- private static final Logger logger = LoggerFactory.getLogger(NDCuboidMapper.class);
-
- private Text outputKey = new Text();
- private String cubeName;
- private String segmentName;
- private CubeDesc cubeDesc;
- private CuboidScheduler cuboidScheduler;
-
- private int handleCounter;
- private int skipCounter;
-
- private byte[] keyBuf = new byte[4096];
- private RowKeySplitter rowKeySplitter;
-
- @Override
- protected void setup(Context context) throws IOException {
-
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- CubeSegment cubeSegment = cube.getSegment(segmentName, CubeSegmentStatusEnum.NEW);
- cubeDesc = cube.getDescriptor();
-
- // initialize CubiodScheduler
- cuboidScheduler = new CuboidScheduler(cubeDesc);
-
- rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
- }
-
- private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
- int offset = 0;
-
- // cuboid id
- System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length);
- offset += childCuboid.getBytes().length;
-
- // rowkey columns
- long mask = Long.highestOneBit(parentCuboid.getId());
- long parentCuboidId = parentCuboid.getId();
- long childCuboidId = childCuboid.getId();
- long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
- int index = 1; // skip cuboidId
- for (int i = 0; i < parentCuboidIdActualLength; i++) {
- if ((mask & parentCuboidId) > 0) {// if the this bit position equals
- // 1
- if ((mask & childCuboidId) > 0) {// if the child cuboid has this
- // column
- System.arraycopy(splitBuffers[index].value, 0, keyBuf, offset, splitBuffers[index].length);
- offset += splitBuffers[index].length;
- }
- index++;
- }
- mask = mask >> 1;
- }
-
- return offset;
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- long cuboidId = rowKeySplitter.split(key.getBytes(), key.getLength());
- Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
-
- Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
-
- // if still empty or null
- if (myChildren == null || myChildren.size() == 0) {
- context.getCounter(BatchConstants.MAPREDUCE_COUTNER_GROUP_NAME, "Skipped records").increment(1L);
- skipCounter++;
- if (skipCounter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Skipped " + skipCounter + " records!");
- }
- return;
- }
-
- context.getCounter(BatchConstants.MAPREDUCE_COUTNER_GROUP_NAME, "Processed records").increment(1L);
-
- handleCounter++;
- if (handleCounter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + handleCounter + " records!");
- }
-
- for (Long child : myChildren) {
- Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
- int keyLength = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
- outputKey.set(keyBuf, 0, keyLength);
- context.write(outputKey, value);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/NewBaseCuboidMapper.java
deleted file mode 100644
index 03ea2a0..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/NewBaseCuboidMapper.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.Array;
-import com.kylinolap.common.util.ByteArray;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-import com.kylinolap.cube.common.BytesSplitter;
-import com.kylinolap.cube.common.SplittedBytes;
-import com.kylinolap.cube.cuboid.Cuboid;
-import com.kylinolap.cube.kv.AbstractRowKeyEncoder;
-import com.kylinolap.cube.kv.RowConstants;
-import com.kylinolap.cube.measure.MeasureCodec;
-import com.kylinolap.dict.lookup.HiveTable;
-import com.kylinolap.dict.lookup.LookupBytesTable;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.MetadataManager;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.DimensionDesc;
-import com.kylinolap.metadata.model.cube.FunctionDesc;
-import com.kylinolap.metadata.model.cube.JoinDesc;
-import com.kylinolap.metadata.model.cube.MeasureDesc;
-import com.kylinolap.metadata.model.cube.ParameterDesc;
-import com.kylinolap.metadata.model.cube.TblColRef;
-import com.kylinolap.metadata.model.schema.TableDesc;
-
-/**
- * @author George Song (ysong1),honma
- */
-public class NewBaseCuboidMapper<KEYIN> extends Mapper<KEYIN, Text, Text, Text> {
-
- private static final Logger logger = LoggerFactory.getLogger(NewBaseCuboidMapper.class);
-
- private String cubeName;
- private String segmentName;
- private Cuboid baseCuboid;
- private CubeInstance cube;
- private CubeSegment cubeSegment;
-
- private CubeDesc cubeDesc;
- private MetadataManager metadataManager;
- private TableDesc factTableDesc;
-
- private boolean byteRowDelimiterInferred = false;
- private byte byteRowDelimiter;
-
- private int counter;
- private Text outputKey = new Text();
- private Text outputValue = new Text();
- private Object[] measures;
- private byte[][] keyBytesBuf;
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
- private BytesSplitter bytesSplitter;
- private AbstractRowKeyEncoder rowKeyEncoder;
- private MeasureCodec measureCodec;
-
- // deal with table join
- private HashMap<String, LookupBytesTable> lookupTables;// name -> table
- private LinkedList<TableJoin> tableJoins;
- private LinkedList<Pair<Integer, Integer>> factTblColAsRowKey;// similar as
- // TableJoin.dimTblColAsRowKey
- private int[][] measureColumnIndice;
- private byte[] nullValue;
-
- private class TableJoin {
- public LinkedList<Integer> fkIndice;// zero-based join columns on fact
- // table
- public String lookupTableName;
- public String joinType;
-
- // Pair.first -> zero-based column index in lookup table
- // Pair.second -> zero based row key index
- public LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey;
-
- private TableJoin(String joinType, LinkedList<Integer> fkIndice, String lookupTableName, LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey) {
- this.joinType = joinType;
- this.fkIndice = fkIndice;
- this.lookupTableName = lookupTableName;
- this.dimTblColAsRowKey = dimTblColAsRowKey;
- }
- }
-
- @Override
- protected void setup(Context context) throws IOException {
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- metadataManager = MetadataManager.getInstance(config);
- cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeSegment = cube.getSegment(segmentName, CubeSegmentStatusEnum.NEW);
- cubeDesc = cube.getDescriptor();
- factTableDesc = metadataManager.getTableDesc(cubeDesc.getFactTable());
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
- // intermediateTableDesc = new
- // JoinedFlatTableDesc(cube.getDescriptor());
-
- rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
- measureCodec = new MeasureCodec(cubeDesc.getMeasures());
- measures = new Object[cubeDesc.getMeasures().size()];
-
- int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
- keyBytesBuf = new byte[colCount][];
-
- bytesSplitter = new BytesSplitter(factTableDesc.getColumns().length, 4096);
-
- nullValue = new byte[] { (byte) '\\', (byte) 'N' };// As in Hive, null
- // value is
- // represented by \N
-
- prepareJoins();
- prepareMetrics();
- }
-
- private void prepareJoins() throws IOException {
- this.lookupTables = new HashMap<String, LookupBytesTable>();
- this.tableJoins = new LinkedList<TableJoin>();
- this.factTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>();
-
- for (DimensionDesc dim : cubeDesc.getDimensions()) {
- JoinDesc join = dim.getJoin();
- if (join != null) {
- String joinType = join.getType().toUpperCase();
- String lookupTableName = dim.getTable();
-
- // load lookup tables
- if (!lookupTables.containsKey(lookupTableName)) {
- HiveTable htable = new HiveTable(metadataManager, lookupTableName);
- LookupBytesTable btable = new LookupBytesTable(metadataManager.getTableDesc(lookupTableName), join.getPrimaryKey(), htable);
- lookupTables.put(lookupTableName, btable);
- }
-
- // create join infos
- LinkedList<Integer> fkIndice = new LinkedList<Integer>();
- for (TblColRef colRef : join.getForeignKeyColumns()) {
- fkIndice.add(colRef.getColumn().getZeroBasedIndex());
- }
- this.tableJoins.add(new TableJoin(joinType, fkIndice, lookupTableName, this.findColumnRowKeyRelationships(dim)));
-
- } else {
-
- this.factTblColAsRowKey.addAll(this.findColumnRowKeyRelationships(dim));
- }
- }
-
- // put composite keys joins ahead of single key joins
- Collections.sort(tableJoins, new Comparator<TableJoin>() {
- @Override
- public int compare(TableJoin o1, TableJoin o2) {
- return Integer.valueOf(o2.fkIndice.size()).compareTo(Integer.valueOf(o1.fkIndice.size()));
- }
- });
- }
-
- private LinkedList<Pair<Integer, Integer>> findColumnRowKeyRelationships(DimensionDesc dim) {
- LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>();
- for (TblColRef colRef : dim.getColumnRefs()) {
- int dimTableIndex = colRef.getColumn().getZeroBasedIndex();
- int rowKeyIndex = cubeDesc.getRowkey().getRowKeyIndexByColumnName(colRef.getName());
- dimTblColAsRowKey.add(new Pair<Integer, Integer>(dimTableIndex, rowKeyIndex));
- }
- return dimTblColAsRowKey;
- }
-
- private void prepareMetrics() {
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- int measureSize = measures.size();
- measureColumnIndice = new int[measureSize][];
- for (int i = 0; i < measureSize; i++) {
- FunctionDesc func = measures.get(i).getFunction();
- List<TblColRef> colRefs = func.getParameter().getColRefs();
- if (colRefs == null) {
- measureColumnIndice[i] = null;
- } else {
- measureColumnIndice[i] = new int[colRefs.size()];
- for (int j = 0; j < colRefs.size(); j++) {
- TblColRef c = colRefs.get(j);
- int factTblIdx = factTableDesc.findColumnByName(c.getName()).getZeroBasedIndex();
- measureColumnIndice[i][j] = factTblIdx;
- }
- }
- }
- }
-
- private byte[] trimSplitBuffer(SplittedBytes splittedBytes) {
- return Arrays.copyOf(splittedBytes.value, splittedBytes.length);
- }
-
- private byte[] buildKey(SplittedBytes[] splitBuffers) {
-
- int filledDimension = 0;// debug
-
- // join lookup tables, and fill into RowKey the columns in lookup table
- for (TableJoin tableJoin : this.tableJoins) {
- String dimTblName = tableJoin.lookupTableName;
- LookupBytesTable dimTbl = this.lookupTables.get(dimTblName);
- ByteArray[] rawKey = new ByteArray[tableJoin.fkIndice.size()];
- for (int i = 0; i < tableJoin.fkIndice.size(); ++i) {
- rawKey[i] = new ByteArray(trimSplitBuffer(splitBuffers[tableJoin.fkIndice.get(i)]));
- }
- Array<ByteArray> key = new Array<ByteArray>(rawKey);
- ByteArray[] dimRow = dimTbl.getRow(key);
- if (dimRow == null) {
- if (tableJoin.joinType.equalsIgnoreCase("INNER")) {
- return null;
- } else if (tableJoin.joinType.equalsIgnoreCase("LEFT")) {
- for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
- keyBytesBuf[relation.getSecond()] = nullValue;
- filledDimension++;
- }
- }
- } else {
- for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
- keyBytesBuf[relation.getSecond()] = dimRow[relation.getFirst()].data;
- filledDimension++;
- }
- }
- }
-
- // fill into RowKey the columns in fact table
- for (Pair<Integer, Integer> relation : this.factTblColAsRowKey) {
- keyBytesBuf[relation.getSecond()] = trimSplitBuffer(splitBuffers[relation.getFirst()]);
- filledDimension++;
- }
-
- assert filledDimension == keyBytesBuf.length;
-
- // all the row key slots(keyBytesBuf) should be complete now
- return rowKeyEncoder.encode(keyBytesBuf);
- }
-
- private void buildValue(SplittedBytes[] splitBuffers) {
-
- for (int i = 0; i < measures.length; i++) {
- byte[] valueBytes = getValueBytes(splitBuffers, i);
- measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
- }
-
- valueBuf.clear();
- measureCodec.encode(measures, valueBuf);
- }
-
- private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
- MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
- ParameterDesc paramDesc = desc.getFunction().getParameter();
- int[] flatTableIdx = this.measureColumnIndice[measureIdx];
-
- byte[] result = null;
-
- // constant
- if (flatTableIdx == null) {
- result = Bytes.toBytes(paramDesc.getValue());
- }
- // column values
- else {
- for (int i = 0; i < flatTableIdx.length; i++) {
- SplittedBytes split = splitBuffers[flatTableIdx[i]];
- result = Arrays.copyOf(split.value, split.length);
- }
- }
-
- if (desc.getFunction().isCount()) {
- result = Bytes.toBytes("1");
- }
-
- return result;
- }
-
- @Override
- public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
- // combining the hive table flattening logic into base cuboid building.
- // the input of this mapper is the fact table rows
-
- counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + counter + " records!");
- }
-
- if (!byteRowDelimiterInferred)
- byteRowDelimiter = bytesSplitter.inferByteRowDelimiter(value.getBytes(), value.getLength(), factTableDesc.getColumns().length);
-
- bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
-
- try {
- byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
- if (rowKey == null)
- return;// skip this fact table row
-
- outputKey.set(rowKey, 0, rowKey.length);
-
- buildValue(bytesSplitter.getSplitBuffers());
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
- context.write(outputKey, outputValue);
-
- } catch (Throwable t) {
- logger.error("", t);
- context.getCounter(BatchConstants.MAPREDUCE_COUTNER_GROUP_NAME, "Error records").increment(1L);
- return;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionJob.java
deleted file mode 100644
index 8245266..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionJob.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.File;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc.CubeCapacity;
-
-/**
- * @author xjiang, ysong1
- *
- */
-
-public class RangeKeyDistributionJob extends AbstractHadoopJob {
- protected static final Logger log = LoggerFactory.getLogger(RangeKeyDistributionJob.class);
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
- */
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
-
- parseOptions(options, args);
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- job = Job.getInstance(getConf(), jobName);
-
- File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- if (JarFile.exists()) {
- job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
- } else {
- job.setJarByClass(this.getClass());
- }
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
- // job.getConfiguration().set("dfs.block.size", "67108864");
-
- // Mapper
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(RangeKeyDistributionMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
- // Reducer - only one
- job.setReducerClass(RangeKeyDistributionReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
-
- this.deletePath(job.getConfiguration(), output);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = cubeMgr.getCube(cubeName);
- CubeCapacity cubeCapacity = cube.getDescriptor().getCapacity();
- job.getConfiguration().set(BatchConstants.CUBE_CAPACITY, cubeCapacity.toString());
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionMapper.java
deleted file mode 100644
index f02ae1a..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionMapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * @author ysong1
- *
- */
-public class RangeKeyDistributionMapper extends Mapper<Text, Text, Text, LongWritable> {
-
- private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L;
-
- private LongWritable outputValue = new LongWritable(0);
-
- private long bytesRead = 0;
-
- private Text lastKey;
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- lastKey = key;
-
- int bytesLength = key.getLength() + value.getLength();
- bytesRead += bytesLength;
-
- if (bytesRead >= ONE_MEGA_BYTES) {
- outputValue.set(bytesRead);
- context.write(key, outputValue);
-
- // reset bytesRead
- bytesRead = 0;
- }
-
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- if (lastKey != null) {
- outputValue.set(bytesRead);
- context.write(lastKey, outputValue);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
deleted file mode 100644
index dafea36..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.StringUtils;
-
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.metadata.model.cube.CubeDesc.CubeCapacity;
-
-/**
- * @author ysong1
- *
- */
-public class RangeKeyDistributionReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
-
- public static final long FIVE_GIGA_BYTES = 5L * 1024L * 1024L * 1024L;
- public static final long TEN_GIGA_BYTES = 10L * 1024L * 1024L * 1024L;
- public static final long TWENTY_GIGA_BYTES = 20L * 1024L * 1024L * 1024L;
-
- private LongWritable outputValue = new LongWritable(0);
-
- private long bytesRead = 0;
- private Text lastKey;
-
- private CubeCapacity cubeCapacity;
- private long cut;
-
- @Override
- protected void setup(Context context) throws IOException {
- cubeCapacity = CubeCapacity.valueOf(context.getConfiguration().get(BatchConstants.CUBE_CAPACITY));
- switch (cubeCapacity) {
- case SMALL:
- cut = FIVE_GIGA_BYTES;
- break;
- case MEDIUM:
- cut = TEN_GIGA_BYTES;
- break;
- case LARGE:
- cut = TWENTY_GIGA_BYTES;
- break;
- }
- }
-
- @Override
- public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
- lastKey = key;
- long length = 0;
- for (LongWritable v : values) {
- length += v.get();
- }
-
- bytesRead += length;
-
- if (bytesRead >= cut) {
- outputValue.set(bytesRead);
- context.write(key, outputValue);
- System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
- // reset bytesRead
- bytesRead = 0;
- }
-
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- if (lastKey != null) {
- outputValue.set(bytesRead);
- context.write(lastKey, outputValue);
- System.out.println(StringUtils.byteToHexString(lastKey.getBytes()) + "\t" + outputValue.get());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerJob.java
deleted file mode 100644
index 718c188..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerJob.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerJob extends AbstractHadoopJob {
-
- @SuppressWarnings("static-access")
- protected static final Option rowKeyStatsFilePath = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath");
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_JOB_NAME);
- options.addOption(rowKeyStatsFilePath);
-
- parseOptions(options, args);
-
- String statsFilePath = getOptionValue(rowKeyStatsFilePath);
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- job = Job.getInstance(getConf(), jobName);
-
- job.setJarByClass(this.getClass());
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
-
- // Mapper
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(RowKeyDistributionCheckerMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
- // Reducer - only one
- job.setReducerClass(RowKeyDistributionCheckerReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
-
- job.getConfiguration().set("rowKeyStatsFilePath", statsFilePath);
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- printUsage(options);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new RowKeyDistributionCheckerJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
deleted file mode 100644
index 76e3f37..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerMapper extends Mapper<Text, Text, Text, LongWritable> {
-
- String rowKeyStatsFilePath;
- byte[][] splitKeys;
- Map<Text, Long> resultMap;
- List<Text> keyList;
-
- @Override
- protected void setup(Context context) throws IOException {
- rowKeyStatsFilePath = context.getConfiguration().get("rowKeyStatsFilePath");
- splitKeys = this.getSplits(context.getConfiguration(), new Path(rowKeyStatsFilePath));
-
- resultMap = new HashMap<Text, Long>();
- keyList = new ArrayList<Text>();
- for (int i = 0; i < splitKeys.length; i++) {
- Text key = new Text(splitKeys[i]);
- resultMap.put(key, 0L);
- keyList.add(new Text(splitKeys[i]));
- }
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- for (Text t : keyList) {
- if (key.compareTo(t) < 0) {
- Long v = resultMap.get(t);
- long length = key.getLength() + value.getLength();
- v += length;
- resultMap.put(t, v);
- break;
- }
- }
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- LongWritable outputValue = new LongWritable();
- for (Entry<Text, Long> kv : resultMap.entrySet()) {
- outputValue.set(kv.getValue());
- context.write(kv.getKey(), outputValue);
- }
- }
-
- @SuppressWarnings("deprecation")
- public byte[][] getSplits(Configuration conf, Path path) {
- List<byte[]> rowkeyList = new ArrayList<byte[]>();
- SequenceFile.Reader reader = null;
- try {
- reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
- Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- while (reader.next(key, value)) {
- byte[] tmp = ((Text) key).copyBytes();
- if (rowkeyList.contains(tmp) == false) {
- rowkeyList.add(tmp);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- IOUtils.closeStream(reader);
- }
-
- byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
-
- return retValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
deleted file mode 100644
index 742f644..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
-
- LongWritable outputKey = new LongWritable(0L);
-
- @Override
- public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-
- long length = 0;
- for (LongWritable v : values) {
- length += v.get();
- }
-
- outputKey.set(length);
- context.write(key, outputKey);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java
deleted file mode 100644
index 51d893c..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/StorageCleanupJob.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.constant.JobStatusEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author ysong1
- */
-public class StorageCleanupJob extends AbstractHadoopJob {
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
-
- protected static final Logger log = LoggerFactory.getLogger(StorageCleanupJob.class);
-
- boolean delete = false;
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
- */
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
- try {
- options.addOption(OPTION_DELETE);
- parseOptions(options, args);
-
- delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
-
- Configuration conf = HBaseConfiguration.create(getConf());
-
- cleanUnusedHdfsFiles(conf);
- cleanUnusedHBaseTables(conf);
- cleanUnusedIntermediateHiveTable(conf);
-
- return 0;
- } catch (Exception e) {
- e.printStackTrace(System.err);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- private boolean isJobInUse(JobInstance job) {
- if (job.getStatus().equals(JobStatusEnum.NEW) || job.getStatus().equals(JobStatusEnum.PENDING) || job.getStatus().equals(JobStatusEnum.RUNNING) || job.getStatus().equals(JobStatusEnum.ERROR)) {
- return true;
- } else {
- return false;
- }
- }
-
-
- private void cleanUnusedHBaseTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- // get all kylin hbase tables
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
- String tableNamePrefix = CubeManager.getHBaseStorageLocationPrefix();
- HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
- List<String> allTablesNeedToBeDropped = new ArrayList<String>();
- for (HTableDescriptor desc : tableDescriptors) {
- String host = desc.getValue(CubeManager.getHtableMetadataKey());
- if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) {
- //only take care htables that belongs to self
- allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
- }
- }
-
- // remove every segment htable from drop list
- for (CubeInstance cube : cubeMgr.listAllCubes()) {
- for (CubeSegment seg : cube.getSegments()) {
- String tablename = seg.getStorageLocationIdentifier();
- allTablesNeedToBeDropped.remove(tablename);
- log.info("Remove table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus());
- }
- }
-
- if (delete == true) {
- // drop tables
- for (String htableName : allTablesNeedToBeDropped) {
- log.info("Deleting HBase table " + htableName);
- if (hbaseAdmin.tableExists(htableName)) {
- hbaseAdmin.disableTable(htableName);
- hbaseAdmin.deleteTable(htableName);
- log.info("Deleted HBase table " + htableName);
- } else {
- log.info("HBase table" + htableName + " does not exist");
- }
- }
- } else {
- System.out.println("--------------- Tables To Be Dropped ---------------");
- for (String htableName : allTablesNeedToBeDropped) {
- System.out.println(htableName);
- }
- System.out.println("----------------------------------------------------");
- }
-
- hbaseAdmin.close();
- }
-
- private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
- JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- FileSystem fs = FileSystem.get(conf);
- List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
- // GlobFilter filter = new
- // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
- // + "/kylin-.*");
- FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
- for (FileStatus status : fStatus) {
- String path = status.getPath().getName();
- // System.out.println(path);
- if (path.startsWith(JobInstance.JOB_WORKING_DIR_PREFIX)) {
- String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + "/" + path;
- allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
- }
- }
-
- List<JobInstance> allJobs = JobDAO.getInstance(KylinConfig.getInstanceFromEnv()).listAllJobs();
- for (JobInstance jobInstance : allJobs) {
- // only remove FINISHED and DISCARDED job intermediate files
- if (isJobInUse(jobInstance) == true) {
- String path = JobInstance.getJobWorkingDir(jobInstance, engineConfig);
- allHdfsPathsNeedToBeDeleted.remove(path);
- log.info("Remove " + path + " from deletion list, as the path belongs to job " + jobInstance.getUuid() + " with status " + jobInstance.getStatus());
- }
- }
-
- // remove every segment working dir from deletion list
- for (CubeInstance cube : cubeMgr.listAllCubes()) {
- for (CubeSegment seg : cube.getSegments()) {
- String jobUuid = seg.getLastBuildJobID();
- if (jobUuid != null && jobUuid.equals("") == false) {
- String path = JobInstance.getJobWorkingDir(jobUuid, engineConfig.getHdfsWorkingDirectory());
- allHdfsPathsNeedToBeDeleted.remove(path);
- log.info("Remove " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName());
- }
- }
- }
-
- if (delete == true) {
- // remove files
- for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
- log.info("Deleting hdfs path " + hdfsPath);
- Path p = new Path(hdfsPath);
- if (fs.exists(p) == true) {
- fs.delete(p, true);
- log.info("Deleted hdfs path " + hdfsPath);
- } else {
- log.info("Hdfs path " + hdfsPath + "does not exist");
- }
- }
- } else {
- System.out.println("--------------- HDFS Path To Be Deleted ---------------");
- for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
- System.out.println(hdfsPath);
- }
- System.out.println("-------------------------------------------------------");
- }
-
- }
-
- private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException {
-
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateDictionaryJob.java b/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateDictionaryJob.java
deleted file mode 100644
index 0ee1811..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateDictionaryJob.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.dict;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.cli.DictionaryGeneratorCLI;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author ysong1
- *
- */
-
-public class CreateDictionaryJob extends AbstractHadoopJob {
-
- private int returnCode = 0;
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
- options.addOption(OPTION_INPUT_PATH);
- parseOptions(options, args);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME);
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
- String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
-
- KylinConfig config = KylinConfig.getInstanceFromEnv();
-
- DictionaryGeneratorCLI.processSegment(config, cubeName, segmentName, factColumnsInputPath);
- } catch (Exception e) {
- printUsage(options);
- e.printStackTrace(System.err);
- log.error(e.getLocalizedMessage(), e);
- returnCode = 2;
- }
-
- return returnCode;
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new CreateDictionaryJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java b/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
deleted file mode 100644
index d75a4a9..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.dict;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author ysong1
- *
- */
-public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_INPUT_PATH);
- parseOptions(options, args);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME);
- String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
- KylinConfig config = KylinConfig.getInstanceFromEnv();
-
- CubeManager mgr = CubeManager.getInstance(config);
- CubeInstance cube = mgr.getCube(cubeName);
- if (cube == null || cube.isInvertedIndex() == false)
- throw new IllegalArgumentException("No Inverted Index Cube found by name " + cubeName);
-
- mgr.buildInvertedIndexDictionary(cube.getFirstSegment(), factColumnsInputPath);
- return 0;
- } catch (Exception e) {
- printUsage(options);
- e.printStackTrace(System.err);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/hbase/BulkLoadJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/hbase/BulkLoadJob.java b/job/src/main/java/com/kylinolap/job/hadoop/hbase/BulkLoadJob.java
deleted file mode 100644
index 1037c70..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/hbase/BulkLoadJob.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.hbase;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.HBaseColumnFamilyDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author ysong1
- *
- */
-public class BulkLoadJob extends AbstractHadoopJob {
-
- protected static final Logger log = LoggerFactory.getLogger(BulkLoadJob.class);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- options.addOption(OPTION_CUBE_NAME);
- parseOptions(options, args);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
- // e.g
- // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
- // end with "/"
- String input = getOptionValue(OPTION_INPUT_PATH);
-
- Configuration conf = HBaseConfiguration.create(getConf());
- FileSystem fs = FileSystem.get(conf);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(config);
- CubeInstance cube = cubeMgr.getCube(cubeName);
- CubeDesc cubeDesc = cube.getDescriptor();
- FsPermission permission = new FsPermission((short) 0777);
- for (HBaseColumnFamilyDesc cf : cubeDesc.getHBaseMapping().getColumnFamily()) {
- String cfName = cf.getName();
- fs.setPermission(new Path(input + cfName), permission);
- }
-
- String[] newArgs = new String[2];
- newArgs[0] = input;
- newArgs[1] = tableName;
-
- log.debug("Start to run LoadIncrementalHFiles");
- int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), newArgs);
- log.debug("End to run LoadIncrementalHFiles");
- return ret;
- } catch (Exception e) {
- printUsage(options);
- e.printStackTrace(System.err);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new BulkLoadJob(), args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
deleted file mode 100644
index 4cb20cb..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/hbase/CreateHTableJob.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.hbase;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.HadoopUtil;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.job.tools.DeployCoprocessorCLI;
-import com.kylinolap.job.tools.LZOSupportnessChecker;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.HBaseColumnFamilyDesc;
-
-/**
- * @author George Song (ysong1)
- */
-
-public class CreateHTableJob extends AbstractHadoopJob {
-
- protected static final Logger log = LoggerFactory.getLogger(CreateHTableJob.class);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_PARTITION_FILE_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- parseOptions(options, args);
-
- Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(config);
- CubeInstance cube = cubeMgr.getCube(cubeName);
- CubeDesc cubeDesc = cube.getDescriptor();
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
- HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
- // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
- tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
- tableDesc.setValue(CubeManager.getHtableMetadataKey(),config.getMetadataUrlPrefix());
-
- Configuration conf = HBaseConfiguration.create(getConf());
- HBaseAdmin admin = new HBaseAdmin(conf);
-
- try {
- if (User.isHBaseSecurityEnabled(conf)) {
- // add coprocessor for bulk load
- tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
- }
-
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- HColumnDescriptor cf = new HColumnDescriptor(cfDesc.getName());
- cf.setMaxVersions(1);
-
- if (LZOSupportnessChecker.getSupportness()) {
- log.info("hbase will use lzo to compress data");
- cf.setCompressionType(Algorithm.LZO);
- } else {
- log.info("hbase will not use lzo to compress data");
- }
-
- cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
- cf.setInMemory(false);
- cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
- tableDesc.addFamily(cf);
- }
-
- byte[][] splitKeys = getSplits(conf, partitionFilePath);
-
- if (admin.tableExists(tableName)) {
- // admin.disableTable(tableName);
- // admin.deleteTable(tableName);
- throw new RuntimeException("HBase table " + tableName + " exists!");
- }
-
- try {
- initHTableCoprocessor(tableDesc);
- log.info("hbase table " + tableName + " deployed with coprocessor.");
-
- } catch (Exception ex) {
- log.error("Error deploying coprocessor on " + tableName, ex);
- log.error("Will try creating the table without coprocessor.");
- }
-
- admin.createTable(tableDesc, splitKeys);
- log.info("create hbase table " + tableName + " done.");
-
- return 0;
- } catch (Exception e) {
- printUsage(options);
- e.printStackTrace(System.err);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- } finally {
- admin.close();
- }
- }
-
- private void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HadoopUtil.getDefaultConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
-
- String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
- Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
-
- DeployCoprocessorCLI.setCoprocessorOnHTable(desc, hdfsCoprocessorJar);
- }
-
- @SuppressWarnings("deprecation")
- public byte[][] getSplits(Configuration conf, Path path) throws Exception {
- List<byte[]> rowkeyList = new ArrayList<byte[]>();
- SequenceFile.Reader reader = null;
- try {
- reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
- Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- while (reader.next(key, value)) {
- byte[] tmp = ((Text) key).copyBytes();
- if (rowkeyList.contains(tmp) == false) {
- rowkeyList.add(tmp);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- } finally {
- IOUtils.closeStream(reader);
- }
-
- byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
- if (retValue.length == 0) {
- throw new IllegalStateException("Split number is 0, no records in cube??");
- }
-
- return retValue;
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new CreateHTableJob(), args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableDesc.java b/job/src/main/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableDesc.java
deleted file mode 100644
index 2fdce86..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableDesc.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.hive;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.cuboid.Cuboid;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.FunctionDesc;
-import com.kylinolap.metadata.model.cube.MeasureDesc;
-import com.kylinolap.metadata.model.cube.TblColRef;
-
-/**
- * @author George Song (ysong1)
- */
-public class JoinedFlatTableDesc {
-
- private String tableName;
- private final CubeDesc cubeDesc;
- private final CubeSegment cubeSegment;
-
- private int[] rowKeyColumnIndexes; // the column index on flat table
- private int[][] measureColumnIndexes; // [i] is the i.th measure related
- // column index on flat table
-
- public JoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment) {
- this.cubeDesc = cubeDesc;
- this.cubeSegment = cubeSegment;
- parseCubeDesc();
- }
-
- /**
- * @return the cubeSegment
- */
- public CubeSegment getCubeSegment() {
- return cubeSegment;
- }
-
- private List<IntermediateColumnDesc> columnList = new ArrayList<IntermediateColumnDesc>();
-
- public List<IntermediateColumnDesc> getColumnList() {
- return columnList;
- }
-
- // check what columns from hive tables are required, and index them
- private void parseCubeDesc() {
- int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length;
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
- if (cubeSegment == null) {
- this.tableName = "kylin_intermediate_" + cubeDesc.getName();
- } else {
- this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getName();
- }
-
- Map<String, Integer> dimensionIndexMap = new HashMap<String, Integer>();
- int columnIndex = 0;
- for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived()) {
- dimensionIndexMap.put(col.getName(), columnIndex);
- columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), col.getName(), col.getDatatype(), col.getTable()));
- columnIndex++;
- }
-
- // build index
- List<TblColRef> cuboidColumns = baseCuboid.getColumns();
- rowKeyColumnIndexes = new int[rowkeyColCount];
- for (int i = 0; i < rowkeyColCount; i++) {
- String colName = cuboidColumns.get(i).getName();
- Integer dimIdx = dimensionIndexMap.get(colName);
- if (dimIdx == null) {
- throw new RuntimeException("Can't find column " + colName);
- }
- rowKeyColumnIndexes[i] = dimIdx;
- }
-
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- int measureSize = measures.size();
- measureColumnIndexes = new int[measureSize][];
- for (int i = 0; i < measureSize; i++) {
- FunctionDesc func = measures.get(i).getFunction();
- List<TblColRef> colRefs = func.getParameter().getColRefs();
- if (colRefs == null) {
- measureColumnIndexes[i] = null;
- } else {
- measureColumnIndexes[i] = new int[colRefs.size()];
- for (int j = 0; j < colRefs.size(); j++) {
- TblColRef c = colRefs.get(j);
- measureColumnIndexes[i][j] = contains(columnList, c);
- if (measureColumnIndexes[i][j] < 0) {
- measureColumnIndexes[i][j] = columnIndex;
- columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c.getName(), c.getDatatype(), c.getTable()));
- columnIndex++;
- }
- }
- }
- }
- }
-
- private int contains(List<IntermediateColumnDesc> columnList, TblColRef c) {
- for (int i = 0; i < columnList.size(); i++) {
- IntermediateColumnDesc col = columnList.get(i);
- if (col.getColumnName().equals(c.getName()) && col.getTableName().equals(c.getTable()))
- return i;
- }
- return -1;
- }
-
- public CubeDesc getCubeDesc() {
- return cubeDesc;
- }
-
- public String getTableName(String jobUUID) {
- return tableName + "_" + jobUUID.replace("-", "_");
- }
-
- public int[] getRowKeyColumnIndexes() {
- return rowKeyColumnIndexes;
- }
-
- public int[][] getMeasureColumnIndexes() {
- return measureColumnIndexes;
- }
-
- public static class IntermediateColumnDesc {
- private String id;
- private String columnName;
- private String dataType;
- private String tableName;
-
- public IntermediateColumnDesc(String id, String columnName, String dataType, String tableName) {
- this.id = id;
- this.columnName = columnName;
- this.dataType = dataType;
- this.tableName = tableName;
- }
-
- public String getId() {
- return id;
- }
-
- public String getColumnName() {
- return columnName;
- }
-
- public String getDataType() {
- return dataType;
- }
-
- public String getTableName() {
- return tableName;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/hive/SqlHiveDataTypeMapping.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/hive/SqlHiveDataTypeMapping.java b/job/src/main/java/com/kylinolap/job/hadoop/hive/SqlHiveDataTypeMapping.java
deleted file mode 100644
index 294ffae..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/hive/SqlHiveDataTypeMapping.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.hive;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class SqlHiveDataTypeMapping {
-
- private static final Map<String, String> sqlToHiveDataTypeMapping = new HashMap<String, String>();
-
- static {
- sqlToHiveDataTypeMapping.put("short", "smallint");
- sqlToHiveDataTypeMapping.put("long", "bigint");
- sqlToHiveDataTypeMapping.put("byte", "tinyint");
- sqlToHiveDataTypeMapping.put("datetime", "date");
- }
-
- public static String getHiveDataType(String javaDataType) {
- String hiveDataType = sqlToHiveDataTypeMapping.get(javaDataType.toLowerCase());
- if (hiveDataType == null) {
- hiveDataType = javaDataType;
- }
- return hiveDataType.toLowerCase();
- }
-}