You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/21 05:58:31 UTC
kylin git commit: KYLIN-1542 change all hfile to 777 before bulk
loading
Repository: kylin
Updated Branches:
refs/heads/master f76190fe9 -> 618a66073
KYLIN-1542 change all hfile to 777 before bulk loading
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/618a6607
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/618a6607
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/618a6607
Branch: refs/heads/master
Commit: 618a6607348479e46dced04983c438d98366fd31
Parents: f76190f
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Apr 21 11:58:13 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Apr 21 11:58:22 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/engine/spark/SparkCubing.java | 199 ++++++++++---------
.../kylin/storage/hbase/ii/IIBulkLoadJob.java | 21 +-
.../kylin/storage/hbase/steps/BulkLoadJob.java | 34 +---
3 files changed, 124 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/618a6607/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index fc7f4b1..ef35067 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -17,12 +17,26 @@
*/
package org.apache.kylin.engine.spark;
-import com.google.common.base.*;
-import com.google.common.collect.*;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.common.primitives.UnsignedBytes;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nullable;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
@@ -30,8 +44,8 @@ import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -40,10 +54,10 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -54,18 +68,22 @@ import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.*;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.cube.util.CubingUtils;
-import org.apache.kylin.dict.*;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
import org.apache.kylin.dimension.Dictionary;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
import org.apache.kylin.engine.spark.util.IteratorUtils;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -80,30 +98,37 @@ import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.*;
+import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.primitives.UnsignedBytes;
/**
*/
public class SparkCubing extends AbstractApplication {
+ protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class);
+
private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
@@ -250,53 +275,47 @@ public class SparkCubing extends AbstractApplication {
row_hashcodes[i] = new ByteArray();
}
- final HashMap<Long, HyperLogLogPlusCounter> samplingResult = rowJavaRDD.aggregate(zeroValue,
- new Function2<HashMap<Long, HyperLogLogPlusCounter>,
- List<String>,
- HashMap<Long, HyperLogLogPlusCounter>>() {
+ final HashMap<Long, HyperLogLogPlusCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HyperLogLogPlusCounter>, List<String>, HashMap<Long, HyperLogLogPlusCounter>>() {
- final HashFunction hashFunction = Hashing.murmur3_128();
+ final HashFunction hashFunction = Hashing.murmur3_128();
- @Override
- public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, List<String> v2) throws Exception {
- for (int i = 0; i < nRowKey; i++) {
- Hasher hc = hashFunction.newHasher();
- String colValue = v2.get(rowKeyColumnIndexes[i]);
- if (colValue != null) {
- row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
- } else {
- row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
- }
- }
-
- for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
- Hasher hc = hashFunction.newHasher();
- HyperLogLogPlusCounter counter = v1.get(entry.getKey());
- final Integer[] cuboidBitSet = entry.getValue();
- for (int position = 0; position < cuboidBitSet.length; position++) {
- hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
- }
- counter.add(hc.hash().asBytes());
- }
- return v1;
+ @Override
+ public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, List<String> v2) throws Exception {
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hashFunction.newHasher();
+ String colValue = v2.get(rowKeyColumnIndexes[i]);
+ if (colValue != null) {
+ row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+ } else {
+ row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
}
- },
- new Function2<HashMap<Long, HyperLogLogPlusCounter>,
- HashMap<Long, HyperLogLogPlusCounter>,
- HashMap<Long, HyperLogLogPlusCounter>>() {
- @Override
- public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, HashMap<Long, HyperLogLogPlusCounter> v2) throws Exception {
- Preconditions.checkArgument(v1.size() == v2.size());
- Preconditions.checkArgument(v1.size() > 0);
- for (Map.Entry<Long, HyperLogLogPlusCounter> entry : v1.entrySet()) {
- final HyperLogLogPlusCounter counter1 = entry.getValue();
- final HyperLogLogPlusCounter counter2 = v2.get(entry.getKey());
- counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
- }
- return v1;
+ }
+
+ for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
+ Hasher hc = hashFunction.newHasher();
+ HyperLogLogPlusCounter counter = v1.get(entry.getKey());
+ final Integer[] cuboidBitSet = entry.getValue();
+ for (int position = 0; position < cuboidBitSet.length; position++) {
+ hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
}
+ counter.add(hc.hash().asBytes());
+ }
+ return v1;
+ }
+ }, new Function2<HashMap<Long, HyperLogLogPlusCounter>, HashMap<Long, HyperLogLogPlusCounter>, HashMap<Long, HyperLogLogPlusCounter>>() {
+ @Override
+ public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, HashMap<Long, HyperLogLogPlusCounter> v2) throws Exception {
+ Preconditions.checkArgument(v1.size() == v2.size());
+ Preconditions.checkArgument(v1.size() > 0);
+ for (Map.Entry<Long, HyperLogLogPlusCounter> entry : v1.entrySet()) {
+ final HyperLogLogPlusCounter counter1 = entry.getValue();
+ final HyperLogLogPlusCounter counter2 = v2.get(entry.getKey());
+ counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
+ }
+ return v1;
+ }
- });
+ });
return samplingResult;
}
@@ -329,14 +348,13 @@ public class SparkCubing extends AbstractApplication {
}
for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
- FunctionDesc func = measureDesc.getFunction();
- List<TblColRef> colRefs = func.getMeasureType().getColumnsNeedDictionary(func);
- for (TblColRef col : colRefs) {
- dictionaryMap.put(col, cubeSegment.getDictionary(col));
- }
+ FunctionDesc func = measureDesc.getFunction();
+ List<TblColRef> colRefs = func.getMeasureType().getColumnsNeedDictionary(func);
+ for (TblColRef col : colRefs) {
+ dictionaryMap.put(col, cubeSegment.getDictionary(col));
+ }
}
-
final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
@Override
@@ -357,7 +375,7 @@ public class SparkCubing extends AbstractApplication {
blockingQueue.put(row);
}
}
- blockingQueue.put(Collections.<String>emptyList());
+ blockingQueue.put(Collections.<String> emptyList());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -478,19 +496,15 @@ public class SparkCubing extends AbstractApplication {
private void bulkLoadHFile(String cubeName, String segmentId, String hfileLocation) throws Exception {
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
- final CubeDesc cubeDesc = cubeInstance.getDescriptor();
final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
final Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
- FileSystem fs = FileSystem.get(hbaseConf);
- FsPermission permission = new FsPermission((short) 0777);
- for (HBaseColumnFamilyDesc cf : cubeDesc.getHbaseMapping().getColumnFamily()) {
- String cfName = cf.getName();
- Path columnFamilyPath = new Path(hfileLocation, cfName);
-
- // File may have already been auto-loaded (in the case of MapR DB)
- if (fs.exists(columnFamilyPath)) {
- fs.setPermission(columnFamilyPath, permission);
- }
+
+ FsShell shell = new FsShell(hbaseConf);
+ try {
+ shell.run(new String[] { "-chmod", "-R", "777", hfileLocation });
+ } catch (Exception e) {
+ logger.error("Couldnt change the file permissions ", e);
+ throw new IOException(e);
}
String[] newArgs = new String[2];
@@ -539,8 +553,7 @@ public class SparkCubing extends AbstractApplication {
kyroClasses.add(java.math.RoundingMode.class);
kyroClasses.add(java.util.ArrayList.class);
kyroClasses.add(java.util.LinkedList.class);
-
-
+
ArrayList<String> result = Lists.newArrayList();
for (Class kyroClass : kyroClasses) {
result.add(kyroClass.getName());
@@ -562,14 +575,12 @@ public class SparkCubing extends AbstractApplication {
//serialization conf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrationRequired", "true");
- final Iterable<String> allClasses = Iterables.filter(
- Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()),
- new Predicate<String>() {
- @Override
- public boolean apply(@Nullable String input) {
- return input != null && input.trim().length() > 0;
- }
- });
+ final Iterable<String> allClasses = Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()), new Predicate<String>() {
+ @Override
+ public boolean apply(@Nullable String input) {
+ return input != null && input.trim().length() > 0;
+ }
+ });
System.out.println("kyro classes:" + allClasses.toString());
conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ","));
http://git-wip-us.apache.org/repos/asf/kylin/blob/618a6607/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java
index 8cf921a..22c8ec6 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java
@@ -18,15 +18,14 @@
package org.apache.kylin.storage.hbase.ii;
+import java.io.IOException;
+
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.storage.hbase.HBaseConnection;
/**
@@ -47,14 +46,12 @@ public class IIBulkLoadJob extends AbstractHadoopJob {
String input = getOptionValue(OPTION_INPUT_PATH);
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- FileSystem fs = FileSystem.get(conf);
-
- Path columnFamilyPath = new Path(input, IIDesc.HBASE_FAMILY);
-
- // File may have already been auto-loaded (in the case of MapR DB)
- if (fs.exists(columnFamilyPath)) {
- FsPermission permission = new FsPermission((short) 0777);
- fs.setPermission(columnFamilyPath, permission);
+ FsShell shell = new FsShell(conf);
+ try {
+ shell.run(new String[] { "-chmod", "-R", "777", input });
+ } catch (Exception e) {
+ logger.error("Couldn't change the file permissions ", e);
+ throw new IOException(e);
}
return ToolRunner.run(new LoadIncrementalHFiles(conf), new String[] { input, tableName });
http://git-wip-us.apache.org/repos/asf/kylin/blob/618a6607/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
index 3d99b98..f43a03a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
@@ -18,18 +18,13 @@
package org.apache.kylin.storage.hbase.steps;
+import java.io.IOException;
+
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
@@ -48,7 +43,7 @@ public class BulkLoadJob extends AbstractHadoopJob {
Options options = new Options();
try {
-
+
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
options.addOption(OPTION_CUBE_NAME);
@@ -61,21 +56,12 @@ public class BulkLoadJob extends AbstractHadoopJob {
String input = getOptionValue(OPTION_INPUT_PATH);
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- FileSystem fs = FileSystem.get(conf);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = cubeMgr.getCube(cubeName);
- CubeDesc cubeDesc = cube.getDescriptor();
- FsPermission permission = new FsPermission((short) 0777);
- for (HBaseColumnFamilyDesc cf : cubeDesc.getHbaseMapping().getColumnFamily()) {
- String cfName = cf.getName();
- Path columnFamilyPath = new Path(input, cfName);
-
- // File may have already been auto-loaded (in the case of MapR DB)
- if (fs.exists(columnFamilyPath)) {
- fs.setPermission(columnFamilyPath, permission);
- }
+ FsShell shell = new FsShell(conf);
+ try {
+ shell.run(new String[] { "-chmod", "-R", "777", input });
+ } catch (Exception e) {
+ logger.error("Couldn't change the file permissions ", e);
+ throw new IOException(e);
}
String[] newArgs = new String[2];