You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/21 05:58:31 UTC

kylin git commit: KYLIN-1542 change all hfile to 777 before bulk loading

Repository: kylin
Updated Branches:
  refs/heads/master f76190fe9 -> 618a66073


KYLIN-1542 change all hfile to 777 before bulk loading


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/618a6607
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/618a6607
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/618a6607

Branch: refs/heads/master
Commit: 618a6607348479e46dced04983c438d98366fd31
Parents: f76190f
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Apr 21 11:58:13 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Apr 21 11:58:22 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/engine/spark/SparkCubing.java  | 199 ++++++++++---------
 .../kylin/storage/hbase/ii/IIBulkLoadJob.java   |  21 +-
 .../kylin/storage/hbase/steps/BulkLoadJob.java  |  34 +---
 3 files changed, 124 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/618a6607/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index fc7f4b1..ef35067 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -17,12 +17,26 @@
 */
 package org.apache.kylin.engine.spark;
 
-import com.google.common.base.*;
-import com.google.common.collect.*;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.common.primitives.UnsignedBytes;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nullable;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -30,8 +44,8 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -40,10 +54,10 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -54,18 +68,22 @@ import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
 import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
 import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.*;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.cube.util.CubingUtils;
-import org.apache.kylin.dict.*;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
 import org.apache.kylin.dimension.Dictionary;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
 import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
 import org.apache.kylin.engine.spark.util.IteratorUtils;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -80,30 +98,37 @@ import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.*;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.HiveContext;
 import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import scala.Tuple2;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.primitives.UnsignedBytes;
 
 /**
  */
 public class SparkCubing extends AbstractApplication {
 
+    protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class);
+
     private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
     private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
     private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
@@ -250,53 +275,47 @@ public class SparkCubing extends AbstractApplication {
             row_hashcodes[i] = new ByteArray();
         }
 
-        final HashMap<Long, HyperLogLogPlusCounter> samplingResult = rowJavaRDD.aggregate(zeroValue,
-                new Function2<HashMap<Long, HyperLogLogPlusCounter>,
-                        List<String>,
-                        HashMap<Long, HyperLogLogPlusCounter>>() {
+        final HashMap<Long, HyperLogLogPlusCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HyperLogLogPlusCounter>, List<String>, HashMap<Long, HyperLogLogPlusCounter>>() {
 
-                    final HashFunction hashFunction = Hashing.murmur3_128();
+            final HashFunction hashFunction = Hashing.murmur3_128();
 
-                    @Override
-                    public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, List<String> v2) throws Exception {
-                        for (int i = 0; i < nRowKey; i++) {
-                            Hasher hc = hashFunction.newHasher();
-                            String colValue = v2.get(rowKeyColumnIndexes[i]);
-                            if (colValue != null) {
-                                row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
-                            } else {
-                                row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
-                            }
-                        }
-
-                        for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
-                            Hasher hc = hashFunction.newHasher();
-                            HyperLogLogPlusCounter counter = v1.get(entry.getKey());
-                            final Integer[] cuboidBitSet = entry.getValue();
-                            for (int position = 0; position < cuboidBitSet.length; position++) {
-                                hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
-                            }
-                            counter.add(hc.hash().asBytes());
-                        }
-                        return v1;
+            @Override
+            public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, List<String> v2) throws Exception {
+                for (int i = 0; i < nRowKey; i++) {
+                    Hasher hc = hashFunction.newHasher();
+                    String colValue = v2.get(rowKeyColumnIndexes[i]);
+                    if (colValue != null) {
+                        row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+                    } else {
+                        row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
                     }
-                },
-                new Function2<HashMap<Long, HyperLogLogPlusCounter>,
-                        HashMap<Long, HyperLogLogPlusCounter>,
-                        HashMap<Long, HyperLogLogPlusCounter>>() {
-                    @Override
-                    public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, HashMap<Long, HyperLogLogPlusCounter> v2) throws Exception {
-                        Preconditions.checkArgument(v1.size() == v2.size());
-                        Preconditions.checkArgument(v1.size() > 0);
-                        for (Map.Entry<Long, HyperLogLogPlusCounter> entry : v1.entrySet()) {
-                            final HyperLogLogPlusCounter counter1 = entry.getValue();
-                            final HyperLogLogPlusCounter counter2 = v2.get(entry.getKey());
-                            counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
-                        }
-                        return v1;
+                }
+
+                for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
+                    Hasher hc = hashFunction.newHasher();
+                    HyperLogLogPlusCounter counter = v1.get(entry.getKey());
+                    final Integer[] cuboidBitSet = entry.getValue();
+                    for (int position = 0; position < cuboidBitSet.length; position++) {
+                        hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
                     }
+                    counter.add(hc.hash().asBytes());
+                }
+                return v1;
+            }
+        }, new Function2<HashMap<Long, HyperLogLogPlusCounter>, HashMap<Long, HyperLogLogPlusCounter>, HashMap<Long, HyperLogLogPlusCounter>>() {
+            @Override
+            public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, HashMap<Long, HyperLogLogPlusCounter> v2) throws Exception {
+                Preconditions.checkArgument(v1.size() == v2.size());
+                Preconditions.checkArgument(v1.size() > 0);
+                for (Map.Entry<Long, HyperLogLogPlusCounter> entry : v1.entrySet()) {
+                    final HyperLogLogPlusCounter counter1 = entry.getValue();
+                    final HyperLogLogPlusCounter counter2 = v2.get(entry.getKey());
+                    counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
+                }
+                return v1;
+            }
 
-                });
+        });
         return samplingResult;
     }
 
@@ -329,14 +348,13 @@ public class SparkCubing extends AbstractApplication {
         }
 
         for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
-			FunctionDesc func = measureDesc.getFunction();
-			List<TblColRef> colRefs = func.getMeasureType().getColumnsNeedDictionary(func);
-			for (TblColRef col : colRefs) {
-				dictionaryMap.put(col, cubeSegment.getDictionary(col));
-			}
+            FunctionDesc func = measureDesc.getFunction();
+            List<TblColRef> colRefs = func.getMeasureType().getColumnsNeedDictionary(func);
+            for (TblColRef col : colRefs) {
+                dictionaryMap.put(col, cubeSegment.getDictionary(col));
+            }
         }
 
-
         final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
 
             @Override
@@ -357,7 +375,7 @@ public class SparkCubing extends AbstractApplication {
                             blockingQueue.put(row);
                         }
                     }
-                    blockingQueue.put(Collections.<String>emptyList());
+                    blockingQueue.put(Collections.<String> emptyList());
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -478,19 +496,15 @@ public class SparkCubing extends AbstractApplication {
     private void bulkLoadHFile(String cubeName, String segmentId, String hfileLocation) throws Exception {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
-        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
         final Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
-        FileSystem fs = FileSystem.get(hbaseConf);
-        FsPermission permission = new FsPermission((short) 0777);
-        for (HBaseColumnFamilyDesc cf : cubeDesc.getHbaseMapping().getColumnFamily()) {
-            String cfName = cf.getName();
-            Path columnFamilyPath = new Path(hfileLocation, cfName);
-
-            // File may have already been auto-loaded (in the case of MapR DB)
-            if (fs.exists(columnFamilyPath)) {
-                fs.setPermission(columnFamilyPath, permission);
-            }
+        
+        FsShell shell = new FsShell(hbaseConf);
+        try {
+            shell.run(new String[] { "-chmod", "-R", "777", hfileLocation });
+        } catch (Exception e) {
+            logger.error("Couldnt change the file permissions ", e);
+            throw new IOException(e);
         }
 
         String[] newArgs = new String[2];
@@ -539,8 +553,7 @@ public class SparkCubing extends AbstractApplication {
         kyroClasses.add(java.math.RoundingMode.class);
         kyroClasses.add(java.util.ArrayList.class);
         kyroClasses.add(java.util.LinkedList.class);
-        
-        
+
         ArrayList<String> result = Lists.newArrayList();
         for (Class kyroClass : kyroClasses) {
             result.add(kyroClass.getName());
@@ -562,14 +575,12 @@ public class SparkCubing extends AbstractApplication {
         //serialization conf
         conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
         conf.set("spark.kryo.registrationRequired", "true");
-        final Iterable<String> allClasses = Iterables.filter(
-                Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()),
-                new Predicate<String>() {
-                    @Override
-                    public boolean apply(@Nullable String input) {
-                        return input != null && input.trim().length() > 0;
-                    }
-                });
+        final Iterable<String> allClasses = Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()), new Predicate<String>() {
+            @Override
+            public boolean apply(@Nullable String input) {
+                return input != null && input.trim().length() > 0;
+            }
+        });
         System.out.println("kyro classes:" + allClasses.toString());
         conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ","));
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/618a6607/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java
index 8cf921a..22c8ec6 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java
@@ -18,15 +18,14 @@
 
 package org.apache.kylin.storage.hbase.ii;
 
+import java.io.IOException;
+
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 
 /**
@@ -47,14 +46,12 @@ public class IIBulkLoadJob extends AbstractHadoopJob {
             String input = getOptionValue(OPTION_INPUT_PATH);
 
             Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-            FileSystem fs = FileSystem.get(conf);
-
-            Path columnFamilyPath = new Path(input, IIDesc.HBASE_FAMILY);
-
-            // File may have already been auto-loaded (in the case of MapR DB)
-            if (fs.exists(columnFamilyPath)) {
-                FsPermission permission = new FsPermission((short) 0777);
-                fs.setPermission(columnFamilyPath, permission);
+            FsShell shell = new FsShell(conf);
+            try {
+                shell.run(new String[] { "-chmod", "-R", "777", input });
+            } catch (Exception e) {
+                logger.error("Couldn't change the file permissions ", e);
+                throw new IOException(e);
             }
 
             return ToolRunner.run(new LoadIncrementalHFiles(conf), new String[] { input, tableName });

http://git-wip-us.apache.org/repos/asf/kylin/blob/618a6607/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
index 3d99b98..f43a03a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
@@ -18,18 +18,13 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
+import java.io.IOException;
+
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
@@ -48,7 +43,7 @@ public class BulkLoadJob extends AbstractHadoopJob {
         Options options = new Options();
 
         try {
-            
+
             options.addOption(OPTION_INPUT_PATH);
             options.addOption(OPTION_HTABLE_NAME);
             options.addOption(OPTION_CUBE_NAME);
@@ -61,21 +56,12 @@ public class BulkLoadJob extends AbstractHadoopJob {
             String input = getOptionValue(OPTION_INPUT_PATH);
 
             Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-            FileSystem fs = FileSystem.get(conf);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            CubeDesc cubeDesc = cube.getDescriptor();
-            FsPermission permission = new FsPermission((short) 0777);
-            for (HBaseColumnFamilyDesc cf : cubeDesc.getHbaseMapping().getColumnFamily()) {
-                String cfName = cf.getName();
-                Path columnFamilyPath = new Path(input, cfName);
-
-                // File may have already been auto-loaded (in the case of MapR DB)
-                if (fs.exists(columnFamilyPath)) {
-                    fs.setPermission(columnFamilyPath, permission);
-                }
+            FsShell shell = new FsShell(conf);
+            try {
+                shell.run(new String[] { "-chmod", "-R", "777", input });
+            } catch (Exception e) {
+                logger.error("Couldn't change the file permissions ", e);
+                throw new IOException(e);
             }
 
             String[] newArgs = new String[2];