You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/18 11:52:30 UTC

[kylin] branch master updated (e7ab5df -> 7886a24)

This is an automated email from the ASF dual-hosted git repository.

nic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from e7ab5df  KYLIN-4151 FileSplit ClassCastException in KafkaMRInput
     new 372f4dd  KYLIN-4325 fix TypeFlag for global dictionary field in SelfDefineSortableKey
     new 20353a3  KYLIN-4415 HTable Creation with Retry
     new 0327429  KYLIN-4416 Disable htable compaction
     new 981cc27  KYLIN-4414 add a property jobTempDir for AbstractHadoopJob to indicate the directory for all of the temporary files
     new 8d2a53f  KYLIN-4414 bulkload needs to follow locality
     new beb976a  KYLIN-4417 Use hash rather than random to avoid potential issue in ConvergeCuboidDataPartitioner
     new 7886a24  KYLIN-4418 Bug fix for ShardingHash.getShard in HBaseLookupRowEncoder

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kylin/common/KylinConfigBase.java   |   8 ++
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |  48 ++++----
 .../mr/steps/ConvergeCuboidDataPartitioner.java    |  17 ++-
 .../engine/mr/steps/FactDistinctColumnsMapper.java |   6 +-
 .../hbase/lookup/HBaseLookupRowEncoder.java        |   3 +-
 .../kylin/storage/hbase/steps/CreateHTableJob.java |   6 +-
 .../kylin/storage/hbase/steps/CubeHFileJob.java    |  15 ++-
 .../kylin/storage/hbase/steps/CubeHTableUtil.java  |  79 +++++++++++--
 .../storage/hbase/steps/HFileOutputFormat3.java    | 126 ++++++++++++++++++++-
 9 files changed, 251 insertions(+), 57 deletions(-)


[kylin] 01/07: KYLIN-4325 fix TypeFlag for global dictionary field in SelfDefineSortableKey

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 372f4ddf324dde8b1bc430710f7c3a09e7f30151
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 14:17:15 2020 +0800

    KYLIN-4325 fix TypeFlag for global dictionary field in SelfDefineSortableKey
---
 .../org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 2c3bc8d..0fa12f9 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -285,7 +285,11 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
         tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
         tmpbuf.put(valueBytes);
         outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-        sortableKey.init(outputKey, type);
+        if (cubeDesc.getDictionaryBuilderClass(allCols.get(colIndex)) == null) {
+            sortableKey.init(outputKey, type);
+        } else {
+            sortableKey.init(outputKey, (byte) 0);
+        }
         context.write(sortableKey, EMPTY_TEXT);
         // log a few rows for troubleshooting
         if (rowCount < 10) {


[kylin] 06/07: KYLIN-4417 Use hash rather than random to avoid potential issue in ConvergeCuboidDataPartitioner

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit beb976a0ef3a3a6c1d488d4e3beeefaef1f545ce
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 16:42:49 2020 +0800

    KYLIN-4417 Use hash rather than random to avoid potential issue in ConvergeCuboidDataPartitioner
---
 .../engine/mr/steps/ConvergeCuboidDataPartitioner.java  | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
index 605905a..3a31318 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.util.Random;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -28,10 +26,12 @@ import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 
 import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 
 public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> implements Configurable {
 
-    private Random rand = new Random();
+    private static final HashFunction hashFunc = Hashing.murmur3_128();
 
     private Configuration conf;
     private boolean enableSharding;
@@ -40,12 +40,14 @@ public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> imple
 
     @Override
     public int getPartition(Text key, Text value, int numReduceTasks) {
+        long hash = hashFunc.hashBytes(key.getBytes()).asLong();
+
         long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), enableSharding);
         // the first numReduceBaseCuboid are for base cuboid
         if (cuboidID == baseCuboidID) {
-            return rand.nextInt(numReduceBaseCuboid);
+            return getRemainder(hash, numReduceBaseCuboid);
         } else {
-            return numReduceBaseCuboid + rand.nextInt(numReduceTasks - numReduceBaseCuboid);
+            return numReduceBaseCuboid + getRemainder(hash, numReduceTasks - numReduceBaseCuboid);
         }
     }
 
@@ -64,4 +66,9 @@ public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> imple
     public Configuration getConf() {
         return conf;
     }
+
+    private static int getRemainder(long val, int base) {
+        int rem = (int) val % base;
+        return rem >= 0 ? rem : rem + base;
+    }
 }


[kylin] 02/07: KYLIN-4415 HTable Creation with Retry

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 20353a339054b71ae50386bf08397c780d2244e7
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 14:45:03 2020 +0800

    KYLIN-4415 HTable Creation with Retry
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 ++
 .../kylin/storage/hbase/steps/CreateHTableJob.java |  2 +-
 .../kylin/storage/hbase/steps/CubeHTableUtil.java  | 78 +++++++++++++++++++---
 3 files changed, 73 insertions(+), 11 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 16c07c1..7136a51 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1259,6 +1259,10 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
+    public int getHBaseHTableAvailableRetry() {
+        return Integer.parseInt(getOptional("kylin.storage.hbase.htable-available-retry", "3"));
+    }
+
     public int getHBaseRegionCountMin() {
         return Integer.parseInt(getOptional("kylin.storage.hbase.min-region-count", "1"));
     }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 4b2218b..7c970c2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -117,7 +117,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment,
                 partitionFilePath.getParent());
 
-        CubeHTableUtil.createHTable(cubeSegment, splitKeys);
+        CubeHTableUtil.createHTable(cubeSegment, splitKeys, true);
 
         // export configuration in advance to avoid connecting to hbase from spark
         if (cubeDesc.getEngineType()== IEngineAware.ID_SPARK){
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
index d06c993..369c7bc 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -19,16 +19,20 @@
 package org.apache.kylin.storage.hbase.steps;
 
 import java.io.IOException;
-
 import java.util.Locale;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.BloomType;
@@ -54,13 +58,14 @@ public class CubeHTableUtil {
 
     private static final Logger logger = LoggerFactory.getLogger(CubeHTableUtil.class);
 
-    public static void createHTable(CubeSegment cubeSegment, byte[][] splitKeys) throws IOException {
-        String tableName = cubeSegment.getStorageLocationIdentifier();
+    public static void createHTable(CubeSegment cubeSegment, byte[][] splitKeys, boolean continueOnExists)
+            throws IOException {
+        TableName tableName = TableName.valueOf(cubeSegment.getStorageLocationIdentifier());
         CubeInstance cubeInstance = cubeSegment.getCubeInstance();
         CubeDesc cubeDesc = cubeInstance.getDescriptor();
         KylinConfig kylinConfig = cubeDesc.getConfig();
 
-        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(cubeSegment.getStorageLocationIdentifier()));
+        HTableDescriptor tableDesc = new HTableDescriptor(tableName);
         tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
         tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
         tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
@@ -95,16 +100,69 @@ public class CubeHTableUtil {
                 tableDesc.addFamily(cf);
             }
 
-            if (admin.tableExists(TableName.valueOf(tableName))) {
-                // admin.disableTable(tableName);
-                // admin.deleteTable(tableName);
-                throw new RuntimeException("HBase table " + tableName + " exists!");
+            if (admin.tableExists(tableName)) {
+                if (!continueOnExists) {
+                    throw new RuntimeException("HBase table " + tableName.toString() + " exists!");
+                } else {
+                    logger.warn("HBase table " + tableName + " exists when create HTable, continue the process!");
+                    if (admin.isTableEnabled(tableName)) {
+                        try {
+                            admin.disableTable(tableName);
+                            logger.warn("Disabled existing enabled HBase table " + tableName.toString());
+                        } catch (TableNotEnabledException e) {
+                            logger.warn("HBase table " + tableName + " already disabled.", e);
+                        }
+                    } else {
+                        logger.warn("HBase table exists but in disabled state.");
+                    }
+                    try {
+                        admin.deleteTable(tableName);
+                        logger.info("Deleted existing HBase table " + tableName.toString());
+                    } catch (TableNotFoundException e) {
+                        logger.warn("HBase table " + tableName + " already deleted.", e);
+                    }
+                }
             }
 
             DeployCoprocessorCLI.deployCoprocessor(tableDesc);
 
-            admin.createTable(tableDesc, splitKeys);
-            Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons");
+            try {
+                admin.createTable(tableDesc, splitKeys);
+            } catch (TableExistsException e) {
+                if (admin.isTableEnabled(tableName)) {
+                    logger.warn(
+                            "Duplicate create table request send to HMaster, ignore it and continue the process, table "
+                                    + tableName.toString());
+                } else {
+                    logger.warn(
+                            "Duplicate create table request send to HMaster, ignore it and continue enable the table "
+                                    + tableName.toString());
+                    admin.enableTable(tableName);
+                }
+            } catch (TimeoutIOException e) {
+                if (admin.isTableEnabled(tableName)) {
+                    logger.warn("False alerting?? Detect TimeoutIOException when creating HBase table "
+                            + tableName.toString(), e);
+                } else {
+                    throw e;
+                }
+            }
+
+            int availableRetry = kylinConfig.getHBaseHTableAvailableRetry();
+            while (availableRetry > 0) {
+                if (!admin.isTableAvailable(tableName)) {
+                    logger.warn("Table created but not available, wait for a while...");
+                    try {
+                        availableRetry--;
+                        Thread.sleep(60000);
+                    } catch (InterruptedException e) {
+                    }
+                } else {
+                    break;
+                }
+            }
+
+            Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
             logger.info("create hbase table " + tableName + " done.");
         } finally {
             IOUtils.closeQuietly(admin);


[kylin] 04/07: KYLIN-4414 add a property jobTempDir for AbstractHadoopJob to indicate the directory for all of the temporary files

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 981cc273be31bb3099c3de34c78f88f986b0db8d
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 16:11:34 2020 +0800

    KYLIN-4414 add a property jobTempDir for AbstractHadoopJob to indicate the directory for all of the temporary files
---
 .../kylin/engine/mr/common/AbstractHadoopJob.java  | 48 ++++++++++------------
 1 file changed, 21 insertions(+), 27 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index fd4d413..411caa7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -157,6 +157,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected OptionsHelper optionsHelper = new OptionsHelper();
 
     protected Job job;
+    private File jobTempDir;
 
     public AbstractHadoopJob() {
         super(HadoopUtil.getCurrentConfiguration());
@@ -595,10 +596,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
 
     protected void dumpKylinPropsAndMetadata(String prj, Set<String> dumpList, KylinConfig kylinConfig,
             Configuration conf) throws IOException {
-        File tmp = File.createTempFile("kylin_job_meta", "");
-        FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
-
-        File metaDir = new File(tmp, "meta");
+        File metaDir = new File(getJobTempDir(), "meta");
         metaDir.mkdirs();
 
         // write kylin.properties
@@ -628,33 +626,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     }
 
     protected void cleanupTempConfFile(Configuration conf) {
-        String[] tempfiles = StringUtils.split(conf.get("tmpfiles"), ",");
-        if (tempfiles == null) {
-            return;
-        }
-        for (String tempMetaFileString : tempfiles) {
-            logger.trace("tempMetaFileString is : " + tempMetaFileString);
-            if (tempMetaFileString != null) {
-                if (tempMetaFileString.startsWith("file://")) {
-                    tempMetaFileString = tempMetaFileString.substring("file://".length());
-                    File tempMetaFile = new File(tempMetaFileString);
-                    if (tempMetaFile.exists()) {
-                        try {
-                            FileUtils.forceDelete(tempMetaFile.getParentFile());
-
-                        } catch (IOException e) {
-                            logger.warn("error when deleting " + tempMetaFile, e);
-                        }
-                    } else {
-                        logger.info("" + tempMetaFileString + " does not exist");
-                    }
-                } else {
-                    logger.info("tempMetaFileString is not starting with file:// :" + tempMetaFileString);
-                }
+        String tmpFilesString = conf.get("tmpfiles");
+        logger.info("tmpFilesString is : " + tmpFilesString);
+        if (jobTempDir != null) {
+            try {
+                FileUtils.forceDelete(jobTempDir);
+            } catch (IOException e) {
+                logger.warn("error when deleting " + jobTempDir, e);
             }
         }
     }
 
+    // It's not thread safe
+    protected File getJobTempDir() throws IOException {
+        if (jobTempDir != null && jobTempDir.isDirectory()) {
+            return jobTempDir;
+        }
+        jobTempDir = File.createTempFile("kylin_job_meta", "");
+        FileUtils.forceDelete(jobTempDir); // we need a directory, so delete the file first
+
+        jobTempDir.mkdirs();
+        return jobTempDir;
+    }
+
     protected void deletePath(Configuration conf, Path path) throws IOException {
         HadoopUtil.deletePath(conf, path);
     }


[kylin] 05/07: KYLIN-4414 bulkload needs to follow locality

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 8d2a53fcd7711a843b3f34eae5953a8bfff3d3be
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 15:29:11 2020 +0800

    KYLIN-4414 bulkload needs to follow locality
---
 .../kylin/storage/hbase/steps/CubeHFileJob.java    |  15 ++-
 .../storage/hbase/steps/HFileOutputFormat3.java    | 126 ++++++++++++++++++++-
 2 files changed, 131 insertions(+), 10 deletions(-)

diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index c0fae42..e403c20 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -18,8 +18,11 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
+import static org.apache.hadoop.hbase.HBaseConfiguration.merge;
+
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Locale;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
@@ -47,8 +50,6 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.hbase.HBaseConfiguration.merge;
-
 /**
  * @author George Song (ysong1)
  */
@@ -77,7 +78,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
             CubeInstance cube = cubeMgr.getCube(cubeName);
 
-            // use current hbase configuration
+            // construct configuration for the MR job cluster
             Configuration configuration = new Configuration(HBaseConnection.getCurrentHBaseConfiguration());
             String[] allServices = getAllServices(configuration);
             merge(configuration, getConf());
@@ -95,10 +96,14 @@ public class CubeHFileJob extends AbstractHadoopJob {
             // add metadata to distributed cache
             attachCubeMetadata(cube, job.getConfiguration());
 
-            HTable htable = new HTable(configuration, getOptionValue(OPTION_HTABLE_NAME));
+            // construct configuration for the HBase cluster
+            Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
+            HTable htable = new HTable(hbaseConf, getOptionValue(OPTION_HTABLE_NAME).toUpperCase(Locale.ROOT));
 
             // Automatic config !
             HFileOutputFormat3.configureIncrementalLoad(job, htable);
+            HFileOutputFormat3.configureHConnection(job, hbaseConf, getJobTempDir());
+
             reconfigurePartitions(configuration, partitionFilePath);
 
             job.setInputFormatClass(SequenceFileInputFormat.class);
@@ -109,7 +114,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
             job.setSortComparatorClass(RowKeyWritable.RowKeyComparator.class);
 
             // set block replication to 3 for hfiles
-            configuration.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
+            job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
 
             this.deletePath(job.getConfiguration(), output);
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
index 12c30ea..e14d012 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
@@ -17,8 +17,11 @@
 */
 package org.apache.kylin.storage.hbase.steps;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
@@ -39,15 +42,20 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -82,6 +90,7 @@ import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.kylin.common.util.RandomUtil;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
 
 /**
  * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, with fix attempt on KYLIN-2788
@@ -114,6 +123,15 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
     // override the auto-detection of datablock encoding.
     public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding";
 
+    /**
+     * Keep locality while generating HFiles for bulkload. See HBASE-12596
+     */
+    public static final String LOCALITY_SENSITIVE_CONF_KEY = "hbase.bulkload.locality.sensitive.enabled";
+    private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
+    private static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name";
+
+    private static final String BULKLOAD_HCONNECTION_CONF_KEY = "hbase.bulkload.hconnection.configuration";
+
     @Override
     public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(final TaskAttemptContext context)
             throws IOException, InterruptedException {
@@ -150,6 +168,8 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
             overriddenEncoding = null;
         }
 
+        final Configuration hConnectionConf = getConfigureHConnection(conf);
+        
         return new RecordWriter<ImmutableBytesWritable, V>() {
             // Map of families to writers and how much has been output on the writer.
             private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR);
@@ -178,7 +198,47 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
                     rollWriters();
                 }
                 if (wl == null || wl.writer == null) {
-                    wl = getNewWriter(family, conf);
+                    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+                        HRegionLocation loc = null;
+                        String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+                        if (tableName != null) {
+                            try (Connection connection = ConnectionFactory.createConnection(hConnectionConf);
+                                    RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
+                                loc = locator.getRegionLocation(rowKey);
+                            } catch (Throwable e) {
+                                LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey), e);
+                                loc = null;
+                            }
+                        }
+
+                        if (null == loc) {
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("failed to get region location, so use default writer: " +
+                                        Bytes.toString(rowKey));
+                            }
+                            wl = getNewWriter(family, conf, null);
+                        } else {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
+                            }
+                            InetSocketAddress initialIsa =
+                                    new InetSocketAddress(loc.getHostname(), loc.getPort());
+                            if (initialIsa.isUnresolved()) {
+                                if (LOG.isTraceEnabled()) {
+                                    LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
+                                            + loc.getPort() + ", so use default writer");
+                                }
+                                wl = getNewWriter(family, conf, null);
+                            } else {
+                                if(LOG.isDebugEnabled()) {
+                                    LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
+                                }
+                                wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
+                            }
+                        }
+                    } else {
+                        wl = getNewWriter(family, conf, null);
+                    }
                 }
                 kv.updateLatestStamp(this.now);
                 wl.writer.append(kv);
@@ -199,7 +259,8 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
             }
 
             @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", justification = "Not important")
-            private WriterLength getNewWriter(byte[] family, Configuration conf) throws IOException {
+            private WriterLength getNewWriter(byte[] family, Configuration conf, InetSocketAddress[] favoredNodes)
+                    throws IOException {
                 WriterLength wl = new WriterLength();
                 Path familydir = new Path(outputdir, Bytes.toString(family));
                 Algorithm compression = compressionMap.get(family);
@@ -219,9 +280,15 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
                 contextBuilder.withDataBlockEncoding(encoding);
                 HFileContext hFileContext = contextBuilder.build();
 
-                wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs).withOutputDir(familydir)
-                        .withBloomType(bloomType).withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext)
-                        .build();
+                if (null == favoredNodes) {
+                    wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
+                            .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
+                            .withFileContext(hFileContext).build();
+                } else {
+                    wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+                            .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
+                            .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build();
+                }
 
                 this.writers.put(family, wl);
                 return wl;
@@ -307,6 +374,49 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
         }
     }
 
+    public static File configureHConnection(Job job, Configuration hConnectionConf, File tempDir) throws IOException {
+        File tempFile = new File(tempDir, "HConfiguration-" + System.currentTimeMillis() + ".xml");
+        tempFile.deleteOnExit();
+
+        FileOutputStream os = new FileOutputStream(tempFile);
+        hConnectionConf.writeXml(os);
+        os.close();
+
+        String tmpFiles = job.getConfiguration().get("tmpfiles", null);
+        if (tmpFiles == null) {
+            tmpFiles = fixWindowsPath("file://" + tempFile.getAbsolutePath());
+        } else {
+            tmpFiles += "," + fixWindowsPath("file://" + tempFile.getAbsolutePath());
+        }
+        job.getConfiguration().set("tmpfiles", tmpFiles);
+        LOG.info("A temporary file " + tempFile.getAbsolutePath()
+                + " is created for storing hconnection related configuration!!!");
+
+        job.getConfiguration().set(BULKLOAD_HCONNECTION_CONF_KEY, tempFile.getName());
+        return tempFile;
+    }
+
+    public static Configuration getConfigureHConnection(Configuration jobConf) {
+        if (Strings.isNullOrEmpty(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY))) {
+            return jobConf;
+        }
+        File tempFile = new File(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY));
+        Configuration hConnectionConf = new Configuration(false);
+        hConnectionConf.addResource(new Path(tempFile.toURI()));
+        return hConnectionConf;
+    }
+
+    public static String fixWindowsPath(String path) {
+        // fix windows path
+        if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
+            path = path.replace("file://", "file:///");
+        }
+        if (path.startsWith("file:///")) {
+            path = path.replace('\\', '/');
+        }
+        return path;
+    }
+
     /**
      * Configure a MapReduce Job to perform an incremental load into the given
      * table. This
@@ -388,6 +498,12 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
         conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(),
                 ResultSerialization.class.getName(), KeyValueSerialization.class.getName());
 
+        if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+            // record this table name for creating writer by favored nodes
+            LOG.info("bulkload locality sensitive enabled");
+            conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
+        }
+        
         // Use table's region boundaries for TOP split points.
         LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
         List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);


[kylin] 07/07: KYLIN-4418 Bug fix for ShardingHash.getShard in HBaseLookupRowEncoder

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7886a2498178ded085497d6204b4b367f56ce0c8
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 16:44:15 2020 +0800

    KYLIN-4418 Bug fix for ShardingHash.getShard in HBaseLookupRowEncoder
---
 .../org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java
index 7269465..51bd32b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java
@@ -88,7 +88,8 @@ public class HBaseLookupRowEncoder extends AbstractLookupRowEncoder<HBaseRow> {
         }
         byte[] result = new byte[RowConstants.ROWKEY_SHARDID_LEN + keyByteBuffer.position()];
         System.arraycopy(keyByteBuffer.array(), 0, result, RowConstants.ROWKEY_SHARDID_LEN, keyByteBuffer.position());
-        short shard = ShardingHash.getShard(result, RowConstants.ROWKEY_SHARDID_LEN, result.length, shardNum);
+        short shard = ShardingHash.getShard(result, RowConstants.ROWKEY_SHARDID_LEN,
+                result.length - RowConstants.ROWKEY_SHARDID_LEN, shardNum);
         BytesUtil.writeShort(shard, result, 0, RowConstants.ROWKEY_SHARDID_LEN);
         return result;
     }


[kylin] 03/07: KYLIN-4416 Disable htable compaction

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 03274297b1dcdd26a93ee6903e764d92e8847a85
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 14:55:35 2020 +0800

    KYLIN-4416 Disable htable compaction
---
 .../src/main/java/org/apache/kylin/common/KylinConfigBase.java        | 4 ++++
 .../java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java    | 4 ++--
 .../java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java     | 1 +
 3 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7136a51..b0f7fec 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1263,6 +1263,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.storage.hbase.htable-available-retry", "3"));
     }
 
+    public int getHBaseRegionCompactionThreshold() {
+        return Integer.parseInt(getOptional("kylin.storage.hbase.region-compaction-threshold", "3"));
+    }
+
     public int getHBaseRegionCountMin() {
         return Integer.parseInt(getOptional("kylin.storage.hbase.min-region-count", "1"));
     }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 7c970c2..4315957 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -280,8 +280,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
             hfileSizeMB = mbPerRegion / 2f;
         }
 
-        int compactionThreshold = Integer.parseInt(hbaseConf.get("hbase.hstore.compactionThreshold", "3"));
-        logger.info("hbase.hstore.compactionThreshold is {}", compactionThreshold);
+        int compactionThreshold = kylinConfig.getHBaseRegionCompactionThreshold();
+        logger.info("kylin.storage.hbase.region-compaction-threshold is " + compactionThreshold);
         if (hfileSizeMB > 0.0f && hfileSizeMB * compactionThreshold < mbPerRegion) {
             hfileSizeMB = ((float) mbPerRegion) / compactionThreshold;
         }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
index 369c7bc..b99e46c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -66,6 +66,7 @@ public class CubeHTableUtil {
         KylinConfig kylinConfig = cubeDesc.getConfig();
 
         HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+        tableDesc.setCompactionEnabled(false);
         tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
         tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
         tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));