You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/06/27 03:10:22 UTC

[kylin] branch master-hadoop3 updated (66059a6 -> ada0355)

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a change to branch master-hadoop3
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 66059a6  Upgrade tomcat vertion to 8.5.51
     new e230d8b  Fix dependency issue in HDP3 and revert cod change to HBase bulk load
     new f2ac53a  KYLIN-4598 Missing dependency when run kylin.sh  org.apache.kylin.*
     new ada0355  KYLIN-4597 Fix NPE when download diagnosis info for a job

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build/bin/kylin.sh                                 |   6 +
 .../kylin/common/util/CliCommandExecutor.java      |   2 +-
 core-metrics/pom.xml                               |   1 +
 pom.xml                                            |  19 +-
 server-base/pom.xml                                |   2 +-
 server/pom.xml                                     |   2 +-
 .../kylin/storage/hbase/steps/CreateHTableJob.java |  14 +-
 .../kylin/storage/hbase/steps/CubeHFileJob.java    |  22 +-
 .../storage/hbase/steps/HFileOutputFormat3.java    | 818 ++++++++++++---------
 stream-receiver/pom.xml                            |   4 +-
 10 files changed, 531 insertions(+), 359 deletions(-)


[kylin] 03/03: KYLIN-4597 Fix NPE when download diagnosis info for a job

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master-hadoop3
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ada0355206c132a8afe171a695aec9b3eaf8b98d
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Sat Jun 27 11:07:06 2020 +0800

    KYLIN-4597 Fix NPE when download diagnosis info for a job
    
    Problems:
    When download diagnosis info for a job, it throws NPE.
    
    Solutions:
    Don't replace '-' to '' for job id, which will change the value of job id and can't find the job.
---
 .../src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
index 74ea1f9..bb08b4c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
@@ -163,7 +163,7 @@ public class CliCommandExecutor {
         }
     }
 
-    public static final String COMMAND_BLOCK_LIST = "[ &`>|{}()$;\\-#~!+*\\\\]+";
+    public static final String COMMAND_BLOCK_LIST = "[ &`>|{}()$;\\#~!+*\\\\]+";
     public static final String COMMAND_WHITE_LIST = "[^\\w%,@/:=?.\"\\[\\]]";
     public static final String HIVE_BLOCK_LIST = "[ <>()$;\\-#!+*\"'/=%@]+";
 


[kylin] 01/03: Fix dependency issue in HDP3 and revert cod change to HBase bulk load

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master-hadoop3
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e230d8bac0e54ae96f2de69c560f6ff605d7e84c
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Wed Jun 24 20:46:44 2020 +0800

    Fix dependency issue in HDP3 and revert cod change to HBase bulk load
---
 core-metrics/pom.xml                               |   1 +
 pom.xml                                            |  19 +-
 server-base/pom.xml                                |   2 +-
 server/pom.xml                                     |   2 +-
 .../kylin/storage/hbase/steps/CreateHTableJob.java |  14 +-
 .../kylin/storage/hbase/steps/CubeHFileJob.java    |  22 +-
 .../storage/hbase/steps/HFileOutputFormat3.java    | 818 ++++++++++++---------
 stream-receiver/pom.xml                            |   4 +-
 8 files changed, 524 insertions(+), 358 deletions(-)

diff --git a/core-metrics/pom.xml b/core-metrics/pom.xml
index eca9e5e..90056bb 100644
--- a/core-metrics/pom.xml
+++ b/core-metrics/pom.xml
@@ -48,6 +48,7 @@
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <!-- Env & Test -->
diff --git a/pom.xml b/pom.xml
index 77e2338..7ffe25f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -594,7 +594,7 @@
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
-                <scope>provided</scope>
+                <scope>compile</scope>
             </dependency>
             <dependency>
                 <groupId>com.jcraft</groupId>
@@ -704,11 +704,23 @@
                 <groupId>org.apache.hive</groupId>
                 <artifactId>hive-jdbc</artifactId>
                 <version>${hive.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <artifactId>jetty-runner</artifactId>
+                        <groupId>org.eclipse.jetty</groupId>
+                    </exclusion>
+                </exclusions>
             </dependency>
             <dependency>
                 <groupId>org.apache.hive.hcatalog</groupId>
                 <artifactId>hive-hcatalog-core</artifactId>
                 <version>${hive-hcatalog.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <artifactId>jetty-runner</artifactId>
+                        <groupId>org.eclipse.jetty</groupId>
+                    </exclusion>
+                </exclusions>
             </dependency>
             <!-- Yarn dependencies -->
             <dependency>
@@ -1084,11 +1096,6 @@
             </dependency>
             <dependency>
                 <groupId>org.eclipse.jetty</groupId>
-                <artifactId>jetty-runner</artifactId>
-                <version>${jetty.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.eclipse.jetty</groupId>
                 <artifactId>jetty-util</artifactId>
                 <version>${jetty.version}</version>
                 <scope>test</scope>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 09a6858..cf8c380 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -266,7 +266,7 @@
         <dependency>
             <groupId>commons-configuration</groupId>
             <artifactId>commons-configuration</artifactId>
-            <scope>provided</scope>
+            <scope>compile</scope>
         </dependency>
     </dependencies>
 
diff --git a/server/pom.xml b/server/pom.xml
index dc7bb44..19c013c 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -35,7 +35,7 @@
         <dependency>
             <groupId>commons-configuration</groupId>
             <artifactId>commons-configuration</artifactId>
-            <scope>provided</scope>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index b26f336..d961849 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.storage.hbase.steps;
 
@@ -135,7 +135,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
         HTable htable = (HTable) conn.getTable(TableName.valueOf(hbaseTableName));
 
-        HFileOutputFormat3.configureIncrementalLoadMap(job, htable);
+        HFileOutputFormat3.configureIncrementalLoadMap(job, htable.getDescriptor());
 
         logger.info("Saving HBase configuration to {}", hbaseConfPath);
         FileSystem fs = HadoopUtil.getWorkingFileSystem();
@@ -160,7 +160,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
     }
 
     public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap,
-            final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder)
+                                                               final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder)
             throws IOException {
 
         final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
@@ -258,7 +258,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
     }
 
     protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion,
-            final Path outputFolder, final KylinConfig kylinConfig) throws IOException {
+                                          final Path outputFolder, final KylinConfig kylinConfig) throws IOException {
 
         if (outputFolder == null) {
             logger.warn("outputFolder for hfile split file is null, skip inner region split");
@@ -346,4 +346,4 @@ public class CreateHTableJob extends AbstractHadoopJob {
         int exitCode = ToolRunner.run(new CreateHTableJob(), args);
         System.exit(exitCode);
     }
-}
+}
\ No newline at end of file
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index 28752ca..1e0e216 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -6,20 +6,18 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.storage.hbase.steps;
 
-import static org.apache.hadoop.hbase.HBaseConfiguration.merge;
-
 import java.io.IOException;
 import java.util.Collection;
 
@@ -54,6 +52,8 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hbase.HBaseConfiguration.merge;
+
 /**
  * @author George Song (ysong1)
  */
@@ -61,7 +61,6 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
     protected static final Logger logger = LoggerFactory.getLogger(CubeHFileJob.class);
 
-    @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
 
@@ -84,7 +83,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
             CubeInstance cube = cubeMgr.getCube(cubeName);
 
-            // construct configuration for the MR job cluster
+            // use current hbase configuration
             Configuration configuration = new Configuration(HBaseConnection.getCurrentHBaseConfiguration());
             String[] allServices = getAllServices(configuration);
             merge(configuration, getConf());
@@ -110,7 +109,6 @@ public class CubeHFileJob extends AbstractHadoopJob {
             RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(hTableName));
             // Automatic config !
             HFileOutputFormat3.configureIncrementalLoad(job, table, regionLocator);
-            HFileOutputFormat3.configureHConnection(job, hbaseConf, getJobTempDir());
             reconfigurePartitions(hbaseConf, partitionFilePath);
 
             job.setInputFormatClass(SequenceFileInputFormat.class);
@@ -121,7 +119,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
             job.setSortComparatorClass(RowKeyWritable.RowKeyComparator.class);
 
             // set block replication to 3 for hfiles
-            job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
+            configuration.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
 
             this.deletePath(job.getConfiguration(), output);
 
@@ -173,9 +171,9 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
     private String[] getAllServices(Configuration hbaseConf) {
         Collection<String> hbaseHdfsServices
-            = hbaseConf.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES);
+                = hbaseConf.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES);
         Collection<String> mainNameServices
-            = getConf().getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES);
+                = getConf().getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES);
         mainNameServices.addAll(hbaseHdfsServices);
         return mainNameServices.toArray(new String[0]);
     }
@@ -185,4 +183,4 @@ public class CubeHFileJob extends AbstractHadoopJob {
         System.exit(exitCode);
     }
 
-}
+}
\ No newline at end of file
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
index 2f139b5..8579ded 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
@@ -14,49 +14,52 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
+
 package org.apache.kylin.storage.hbase.steps;
 
-import java.io.File;
-import java.io.FileOutputStream;
+import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
+import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
+import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
+import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
+
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.HStoreFile;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -66,17 +69,21 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
 import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
 import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
 import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TextSortReducer;
+import org.apache.hadoop.hbase.mapreduce.CellSerialization;
+import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
+import org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -89,71 +96,134 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.kylin.shaded.com.google.common.base.Strings;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
- * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, with fix attempt on KYLIN-2788
- *
+ * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, with fix attempt on KYLIN-4293|HBASE-22887
+ * <p>
  * Writes HFiles. Passed Cells must arrive in order.
  * Writes current time as the sequence id for the file. Sets the major compacted
  * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
  * all HFiles being written.
  * <p>
  * Using this class as part of a MapReduce job is best done
- * using {@link #configureIncrementalLoad(Job, Table, RegionLocator)}.
+ * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
  */
 @InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
-    static Log LOG = LogFactory.getLog(HFileOutputFormat3.class);
+public class HFileOutputFormat3
+        extends FileOutputFormat<ImmutableBytesWritable, Cell> {
+    private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat3.class);
+
+    static class TableInfo {
+        private TableDescriptor tableDesctiptor;
+        private RegionLocator regionLocator;
+
+        public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
+            this.tableDesctiptor = tableDesctiptor;
+            this.regionLocator = regionLocator;
+        }
+
+        /**
+         * The modification for the returned HTD doesn't affect the inner TD.
+         *
+         * @return A clone of inner table descriptor
+         * @deprecated use {@link #getTableDescriptor}
+         */
+        @Deprecated
+        public HTableDescriptor getHTableDescriptor() {
+            return new HTableDescriptor(tableDesctiptor);
+        }
+
+        public TableDescriptor getTableDescriptor() {
+            return tableDesctiptor;
+        }
+
+        public RegionLocator getRegionLocator() {
+            return regionLocator;
+        }
+    }
+
+    protected static final byte[] tableSeparator = Bytes.toBytes(";");
+
+    protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
+        return Bytes.add(tableName, tableSeparator, suffix);
+    }
 
     // The following constants are private since these are used by
     // HFileOutputFormat2 to internally transfer data between job setup and
     // reducer run using conf.
     // These should not be changed by the client.
-    private static final String COMPRESSION_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.compression";
-    private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
-    private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";
-    private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
+    static final String COMPRESSION_FAMILIES_CONF_KEY =
+            "hbase.hfileoutputformat.families.compression";
+    static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
+            "hbase.hfileoutputformat.families.bloomtype";
+    static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.blocksize";
+    static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
 
     // This constant is public since the client can modify this when setting
     // up their conf object and thus refer to this symbol.
     // It is present for backwards compatibility reasons. Use it only to
     // override the auto-detection of datablock encoding.
-    public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding";
+    public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.datablock.encoding";
 
     /**
      * Keep locality while generating HFiles for bulkload. See HBASE-12596
      */
-    public static final String LOCALITY_SENSITIVE_CONF_KEY = "hbase.bulkload.locality.sensitive.enabled";
+    public static final String LOCALITY_SENSITIVE_CONF_KEY =
+            "hbase.bulkload.locality.sensitive.enabled";
     private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
-    private static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name";
+    static final String OUTPUT_TABLE_NAME_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.table.name";
+    static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
+            "hbase.mapreduce.use.multi.table.hfileoutputformat";
 
-    private static final String BULKLOAD_HCONNECTION_CONF_KEY = "hbase.bulkload.hconnection.configuration";
+    public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
+    public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
 
     @Override
-    public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(final TaskAttemptContext context)
-            throws IOException, InterruptedException {
+    public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
+            final TaskAttemptContext context) throws IOException, InterruptedException {
         return createRecordWriter(context, this.getOutputCommitter(context));
     }
 
-    static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(final TaskAttemptContext context,
-            final OutputCommitter committer) throws IOException, InterruptedException {
+    protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
+        return combineTableNameSuffix(tableName, family);
+    }
+
+    static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
+    createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer)
+            throws IOException {
 
         // Get the path of the temporary output file
-        final Path outputdir = ((FileOutputCommitter) committer).getWorkPath();
+        final Path outputDir = ((FileOutputCommitter) committer).getWorkPath();
         final Configuration conf = context.getConfiguration();
-        LOG.debug("Task output path: " + outputdir);
-        final FileSystem fs = outputdir.getFileSystem(conf);
+        final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
+        final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+        if (writeTableNames == null || writeTableNames.isEmpty()) {
+            throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
+                    + " cannot be empty");
+        }
+        final FileSystem fs = outputDir.getFileSystem(conf);
         // These configs. are from hbase-*.xml
-        final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
+        final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
+                HConstants.DEFAULT_MAX_FILE_SIZE);
         // Invented config.  Add to hbase-*.xml if other than default compression.
-        final String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
-        final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
-        final boolean compactionExclude = conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
-                false);
+        final String defaultCompressionStr = conf.get("hfile.compression",
+                Compression.Algorithm.NONE.getName());
+        final Algorithm defaultCompression = HFileWriterImpl
+                .compressionByName(defaultCompressionStr);
+        final boolean compactionExclude = conf.getBoolean(
+                "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
+
+        final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
+                Bytes.toString(tableSeparator))).collect(Collectors.toSet());
 
         // create a map from column family to the compression algorithm
         final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
@@ -161,7 +231,8 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
         final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
 
         String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
-        final Map<byte[], DataBlockEncoding> datablockEncodingMap = createFamilyDataBlockEncodingMap(conf);
+        final Map<byte[], DataBlockEncoding> datablockEncodingMap
+                = createFamilyDataBlockEncodingMap(conf);
         final DataBlockEncoding overriddenEncoding;
         if (dataBlockEncodingStr != null) {
             overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
@@ -169,55 +240,83 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
             overriddenEncoding = null;
         }
 
-        final Configuration hConnectionConf = getConfigureHConnection(conf);
-        
         return new RecordWriter<ImmutableBytesWritable, V>() {
             // Map of families to writers and how much has been output on the writer.
-            private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR);
-            private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
-            private final byte[] now = Bytes.toBytes(System.currentTimeMillis());
-            private boolean rollRequested = false;
+            private final Map<byte[], WriterLength> writers =
+                    new TreeMap<>(Bytes.BYTES_COMPARATOR);
+            private final Map<byte[], byte[]> previousRows =
+                    new TreeMap<>(Bytes.BYTES_COMPARATOR);
+            private final long now = EnvironmentEdgeManager.currentTime();
 
             @Override
-            public void write(ImmutableBytesWritable row, V cell) throws IOException {
-                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+            public void write(ImmutableBytesWritable row, V cell)
+                    throws IOException {
+                Cell kv = cell;
+                // null input == user explicitly wants to flush
                 if (row == null && kv == null) {
-                    rollWriters();
+                    rollWriters(null);
                     return;
                 }
+
                 byte[] rowKey = CellUtil.cloneRow(kv);
-                long length = kv.getLength();
+                int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
                 byte[] family = CellUtil.cloneFamily(kv);
-                WriterLength wl = this.writers.get(family);
-                if (wl == null) {
-                    fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
+                byte[] tableNameBytes = null;
+                if (writeMultipleTables) {
+                    tableNameBytes = HFileOutputFormat3.getTableName(row.get());
+                    if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
+                        throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
+                                "' not" + " expected");
+                    }
+                } else {
+                    tableNameBytes = Bytes.toBytes(writeTableNames);
                 }
-                if (wl != null && wl.written + length >= maxsize) {
-                    this.rollRequested = true;
+                byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
+                WriterLength wl = this.writers.get(tableAndFamily);
+
+                // If this is a new column family, verify that the directory exists
+                if (wl == null) {
+                    Path writerPath = null;
+                    if (writeMultipleTables) {
+                        writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
+                                .toString(family)));
+                    } else {
+                        writerPath = new Path(outputDir, Bytes.toString(family));
+                    }
+                    fs.mkdirs(writerPath);
+                    configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
                 }
-                if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
-                    rollWriters();
+
+                // This can only happen once a row is finished though
+                if (wl != null && wl.written + length >= maxsize
+                        && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) {
+                    rollWriters(wl);
                 }
+
+                // create a new WAL writer, if necessary
                 if (wl == null || wl.writer == null) {
                     if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
                         HRegionLocation loc = null;
-                        String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+
+                        String tableName = Bytes.toString(tableNameBytes);
                         if (tableName != null) {
-                            try (Connection connection = ConnectionFactory.createConnection(hConnectionConf);
-                                    RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
+                            try (Connection connection = ConnectionFactory.createConnection(conf);
+                                 RegionLocator locator =
+                                         connection.getRegionLocator(TableName.valueOf(tableName))) {
                                 loc = locator.getRegionLocation(rowKey);
                             } catch (Throwable e) {
-                                LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey), e);
+                                LOG.warn("There's something wrong when locating rowkey: " +
+                                        Bytes.toString(rowKey) + " for tablename: " + tableName, e);
                                 loc = null;
                             }
                         }
 
                         if (null == loc) {
                             if (LOG.isTraceEnabled()) {
-                                LOG.trace("failed to get region location, so use default writer: " +
+                                LOG.trace("failed to get region location, so use default writer for rowkey: " +
                                         Bytes.toString(rowKey));
                             }
-                            wl = getNewWriter(family, conf, null);
+                            wl = getNewWriter(tableNameBytes, family, conf, null);
                         } else {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
@@ -229,86 +328,124 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
                                     LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
                                             + loc.getPort() + ", so use default writer");
                                 }
-                                wl = getNewWriter(family, conf, null);
+                                wl = getNewWriter(tableNameBytes, family, conf, null);
                             } else {
-                                if(LOG.isDebugEnabled()) {
+                                if (LOG.isDebugEnabled()) {
                                     LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
                                 }
-                                wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
+                                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[]{initialIsa
+                                });
                             }
                         }
                     } else {
-                        wl = getNewWriter(family, conf, null);
+                        wl = getNewWriter(tableNameBytes, family, conf, null);
                     }
                 }
-                kv.updateLatestStamp(this.now);
+
+                // we now have the proper WAL writer. full steam ahead
+                PrivateCellUtil.updateLatestStamp(cell, this.now);
                 wl.writer.append(kv);
                 wl.written += length;
-                this.previousRow = rowKey;
+
+                // Copy the row so we know when a row transition.
+                this.previousRows.put(family, rowKey);
             }
 
-            private void rollWriters() throws IOException {
-                for (WriterLength wl : this.writers.values()) {
-                    if (wl.writer != null) {
-                        LOG.info("Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
-                        close(wl.writer);
+            private void rollWriters(WriterLength writerLength) throws IOException {
+                if (writerLength != null) {
+                    closeWriter(writerLength);
+                } else {
+                    for (WriterLength wl : this.writers.values()) {
+                        closeWriter(wl);
                     }
-                    wl.writer = null;
-                    wl.written = 0;
                 }
-                this.rollRequested = false;
             }
 
-            @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", justification = "Not important")
-            private WriterLength getNewWriter(byte[] family, Configuration conf, InetSocketAddress[] favoredNodes)
-                    throws IOException {
+            private void closeWriter(WriterLength wl) throws IOException {
+                if (wl.writer != null) {
+                    LOG.info(
+                            "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
+                    close(wl.writer);
+                }
+                wl.writer = null;
+                wl.written = 0;
+            }
+
+            /*
+             * Create a new StoreFile.Writer.
+             * @param family
+             * @return A WriterLength, containing a new StoreFile.Writer.
+             * @throws IOException
+             */
+            @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
+                    justification = "Not important")
+            private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
+                    conf, InetSocketAddress[] favoredNodes) throws IOException {
+                byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
+                Path familydir = new Path(outputDir, Bytes.toString(family));
+                if (writeMultipleTables) {
+                    familydir = new Path(outputDir,
+                            new Path(Bytes.toString(tableName), Bytes.toString(family)));
+                }
                 WriterLength wl = new WriterLength();
-                Path familydir = new Path(outputdir, Bytes.toString(family));
-                Algorithm compression = compressionMap.get(family);
+                Algorithm compression = compressionMap.get(tableAndFamily);
                 compression = compression == null ? defaultCompression : compression;
-                BloomType bloomType = bloomTypeMap.get(family);
+                BloomType bloomType = bloomTypeMap.get(tableAndFamily);
                 bloomType = bloomType == null ? BloomType.NONE : bloomType;
-                Integer blockSize = blockSizeMap.get(family);
+                Integer blockSize = blockSizeMap.get(tableAndFamily);
                 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
                 DataBlockEncoding encoding = overriddenEncoding;
-                encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
+                encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
                 encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
                 Configuration tempConf = new Configuration(conf);
                 tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
-                HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
+                HFileContextBuilder contextBuilder = new HFileContextBuilder()
+                        .withCompression(compression)
                         .withChecksumType(HStore.getChecksumType(conf))
-                        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize);
+                        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+                        .withBlockSize(blockSize);
+
+                if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
+                    contextBuilder.withIncludesTags(true);
+                }
+
                 contextBuilder.withDataBlockEncoding(encoding);
                 HFileContext hFileContext = contextBuilder.build();
-
                 if (null == favoredNodes) {
-                    StoreFileWriter.Builder writerBuilder = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs);
-                    wl.writer = writerBuilder.withOutputDir(familydir).withBloomType(bloomType)
-                            .withComparator(new CellComparatorImpl.MetaCellComparator()).withFileContext(hFileContext).build();
+                    wl.writer =
+                            new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
+                                    .withOutputDir(familydir).withBloomType(bloomType)
+                                    .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build();
                 } else {
-                    StoreFileWriter.Builder writerBuilder = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs);
-                    wl.writer = writerBuilder.withOutputDir(familydir).withBloomType(bloomType)
-                            .withComparator(new CellComparatorImpl.MetaCellComparator())
-                            .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build();
+                    wl.writer =
+                            new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+                                    .withOutputDir(familydir).withBloomType(bloomType)
+                                    .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
+                                    .withFavoredNodes(favoredNodes).build();
                 }
 
-                this.writers.put(family, wl);
+                this.writers.put(tableAndFamily, wl);
                 return wl;
             }
 
             private void close(final StoreFileWriter w) throws IOException {
                 if (w != null) {
-                    w.appendFileInfo(HStoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
-                    w.appendFileInfo(HStoreFile.BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
-                    w.appendFileInfo(HStoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
-                    w.appendFileInfo(HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
+                    w.appendFileInfo(BULKLOAD_TIME_KEY,
+                            Bytes.toBytes(System.currentTimeMillis()));
+                    w.appendFileInfo(BULKLOAD_TASK_KEY,
+                            Bytes.toBytes(context.getTaskAttemptID().toString()));
+                    w.appendFileInfo(MAJOR_COMPACTION_KEY,
+                            Bytes.toBytes(true));
+                    w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+                            Bytes.toBytes(compactionExclude));
                     w.appendTrackedTimestampsToMetadata();
                     w.close();
                 }
             }
 
             @Override
-            public void close(TaskAttemptContext c) throws IOException, InterruptedException {
+            public void close(TaskAttemptContext c)
+                    throws IOException, InterruptedException {
                 for (WriterLength wl : this.writers.values()) {
                     close(wl.writer);
                 }
@@ -316,6 +453,21 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
         };
     }
 
+    /**
+     * Configure block storage policy for CF after the directory is created.
+     */
+    static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
+                                       byte[] tableAndFamily, Path cfPath) {
+        if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
+            return;
+        }
+
+        String policy =
+                conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
+                        conf.get(STORAGE_POLICY_PROPERTY));
+        FSUtils.setStoragePolicy(fs, cfPath, policy);
+    }
+
     /*
      * Data structure to hold a Writer and amount of data written on it.
      */
@@ -328,11 +480,27 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
      * Return the start keys of all of the regions in this table,
      * as a list of ImmutableBytesWritable.
      */
-    private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table) throws IOException {
-        byte[][] byteKeys = table.getStartKeys();
-        ArrayList<ImmutableBytesWritable> ret = new ArrayList<ImmutableBytesWritable>(byteKeys.length);
-        for (byte[] byteKey : byteKeys) {
-            ret.add(new ImmutableBytesWritable(byteKey));
+    private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
+                                                                   boolean writeMultipleTables)
+            throws IOException {
+
+        ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
+        for (RegionLocator regionLocator : regionLocators) {
+            TableName tableName = regionLocator.getName();
+            LOG.info("Looking up current regions for table " + tableName);
+            byte[][] byteKeys = regionLocator.getStartKeys();
+            for (byte[] byteKey : byteKeys) {
+                byte[] fullKey = byteKey; //HFileOutputFormat2 use case
+                if (writeMultipleTables) {
+                    //MultiTableHFileOutputFormat use case
+                    fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("SplitPoint startkey for table [" + tableName + "]: ["
+                            + Bytes.toStringBinary(fullKey) + "]");
+                }
+                ret.add(new ImmutableBytesWritable(fullKey));
+            }
         }
         return ret;
     }
@@ -342,8 +510,8 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
      * {@link TotalOrderPartitioner} that contains the split points in startKeys.
      */
     @SuppressWarnings("deprecation")
-    private static void writePartitions(Configuration conf, Path partitionsPath, List<ImmutableBytesWritable> startKeys)
-            throws IOException {
+    private static void writePartitions(Configuration conf, Path partitionsPath,
+                                        List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
         LOG.info("Writing partition information to " + partitionsPath);
         if (startKeys.isEmpty()) {
             throw new IllegalArgumentException("No regions passed");
@@ -353,18 +521,22 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
         // have keys < the first region (which has an empty start key)
         // so we need to remove it. Otherwise we would end up with an
         // empty reducer with index 0
-        TreeSet<ImmutableBytesWritable> sorted = new TreeSet<ImmutableBytesWritable>(startKeys);
-
+        TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
         ImmutableBytesWritable first = sorted.first();
-        if (!Arrays.equals(first.get(), HConstants.EMPTY_BYTE_ARRAY)) {
-            throw new IllegalArgumentException("First region of table should have empty start key. Instead has: "
-                    + Bytes.toStringBinary(first.get()));
+        if (writeMultipleTables) {
+            first = new ImmutableBytesWritable(HFileOutputFormat3.getSuffix(sorted.first().get()));
+        }
+        if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
+            throw new IllegalArgumentException(
+                    "First region of table should have empty start key. Instead has: "
+                            + Bytes.toStringBinary(first.get()));
         }
-        sorted.remove(first);
+        sorted.remove(sorted.first());
 
         // Write the actual file
         FileSystem fs = partitionsPath.getFileSystem(conf);
-        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, ImmutableBytesWritable.class,
+        SequenceFile.Writer writer = SequenceFile.createWriter(
+                fs, conf, partitionsPath, ImmutableBytesWritable.class,
                 NullWritable.class);
 
         try {
@@ -376,70 +548,6 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
         }
     }
 
-    public static File configureHConnection(Job job, Configuration hConnectionConf, File tempDir) throws IOException {
-        File tempFile = new File(tempDir, "HConfiguration-" + System.currentTimeMillis() + ".xml");
-        tempFile.deleteOnExit();
-
-        FileOutputStream os = new FileOutputStream(tempFile);
-        hConnectionConf.writeXml(os);
-        os.close();
-
-        String tmpFiles = job.getConfiguration().get("tmpfiles", null);
-        if (tmpFiles == null) {
-            tmpFiles = fixWindowsPath("file://" + tempFile.getAbsolutePath());
-        } else {
-            tmpFiles += "," + fixWindowsPath("file://" + tempFile.getAbsolutePath());
-        }
-        job.getConfiguration().set("tmpfiles", tmpFiles);
-        LOG.info("A temporary file " + tempFile.getAbsolutePath()
-                + " is created for storing hconnection related configuration!!!");
-
-        job.getConfiguration().set(BULKLOAD_HCONNECTION_CONF_KEY, tempFile.getName());
-        return tempFile;
-    }
-
-    public static Configuration getConfigureHConnection(Configuration jobConf) {
-        if (Strings.isNullOrEmpty(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY))) {
-            return jobConf;
-        }
-        File tempFile = new File(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY));
-        Configuration hConnectionConf = new Configuration(false);
-        hConnectionConf.addResource(new Path(tempFile.toURI()));
-        return hConnectionConf;
-    }
-
-    public static String fixWindowsPath(String path) {
-        // fix windows path
-        if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
-            path = path.replace("file://", "file:///");
-        }
-        if (path.startsWith("file:///")) {
-            path = path.replace('\\', '/');
-        }
-        return path;
-    }
-
-    /**
-     * Configure a MapReduce Job to perform an incremental load into the given
-     * table. This
-     * <ul>
-     *   <li>Inspects the table to configure a total order partitioner</li>
-     *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
-     *   <li>Sets the number of reduce tasks to match the current number of regions</li>
-     *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
-     *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
-     *     PutSortReducer)</li>
-     * </ul>
-     * The user should be sure to set the map output value class to either KeyValue or Put before
-     * running this function.
-     *
-     * @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead.
-     */
-    @Deprecated
-    public static void configureIncrementalLoad(Job job, HTable table) throws IOException {
-        configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator());
-    }
-
     /**
      * Configure a MapReduce Job to perform an incremental load into the given
      * table. This
@@ -454,8 +562,9 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
      * The user should be sure to set the map output value class to either KeyValue or Put before
      * running this function.
      */
-    public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) throws IOException {
-        configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
+    public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
+            throws IOException {
+        configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
     }
 
     /**
@@ -472,23 +581,34 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
      * The user should be sure to set the map output value class to either KeyValue or Put before
      * running this function.
      */
-    public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator)
-            throws IOException {
-        configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat3.class);
+    public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
+                                                RegionLocator regionLocator) throws IOException {
+        ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
+        singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
+        configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat3.class);
     }
 
-    static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator,
-            Class<? extends OutputFormat<?, ?>> cls) throws IOException, UnsupportedEncodingException {
+    static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
+                                         Class<? extends OutputFormat<?, ?>> cls) throws IOException {
         Configuration conf = job.getConfiguration();
         job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(KeyValue.class);
+        job.setOutputValueClass(MapReduceExtendedCell.class);
         job.setOutputFormatClass(cls);
 
+        if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
+            throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
+        }
+        boolean writeMultipleTables = false;
+        if (MultiTableHFileOutputFormat.class.equals(cls)) {
+            writeMultipleTables = true;
+            conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
+        }
         // Based on the configured map output class, set the correct reducer to properly
         // sort the incoming values.
         // TODO it would be nice to pick one or the other of these formats.
-        if (KeyValue.class.equals(job.getMapOutputValueClass())) {
-            job.setReducerClass(KeyValueSortReducer.class);
+        if (KeyValue.class.equals(job.getMapOutputValueClass())
+                || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
+            job.setReducerClass(CellSortReducer.class);
         } else if (Put.class.equals(job.getMapOutputValueClass())) {
             job.setReducerClass(PutSortReducer.class);
         } else if (Text.class.equals(job.getMapOutputValueClass())) {
@@ -497,50 +617,75 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
             LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
         }
 
-        conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(),
-                ResultSerialization.class.getName(), KeyValueSerialization.class.getName());
+        conf.setStrings("io.serializations", conf.get("io.serializations"),
+                MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+                CellSerialization.class.getName());
 
         if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
-            // record this table name for creating writer by favored nodes
             LOG.info("bulkload locality sensitive enabled");
-            conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
         }
-        
+
+        /* Now get the region start keys for every table required */
+        List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
+        List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size());
+        List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size());
+
+        for (TableInfo tableInfo : multiTableInfo) {
+            regionLocators.add(tableInfo.getRegionLocator());
+            allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
+            tableDescriptors.add(tableInfo.getTableDescriptor());
+        }
+        // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
+        conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
+                .toString(tableSeparator)));
+        List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
         // Use table's region boundaries for TOP split points.
-        LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
-        List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
-        LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count");
+        LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
+                "to match current region count for all tables");
         job.setNumReduceTasks(startKeys.size());
 
-        configurePartitioner(job, startKeys);
+        configurePartitioner(job, startKeys, writeMultipleTables);
         // Set compression algorithms based on column families
-        configureCompression(conf, tableDescriptor);
-        configureBloomType(tableDescriptor, conf);
-        configureBlockSize(tableDescriptor, conf);
-        configureDataBlockEncoding(tableDescriptor, conf);
+
+        conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
+                tableDescriptors));
+        conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
+                tableDescriptors));
+        conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
+                tableDescriptors));
+        conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+                serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
 
         TableMapReduceUtil.addDependencyJars(job);
         TableMapReduceUtil.initCredentials(job);
-        LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
+        LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
     }
 
-    public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
+    public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
+            IOException {
         Configuration conf = job.getConfiguration();
 
         job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(KeyValue.class);
+        job.setOutputValueClass(MapReduceExtendedCell.class);
         job.setOutputFormatClass(HFileOutputFormat3.class);
 
+        ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
+        singleTableDescriptor.add(tableDescriptor);
+
+        conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
         // Set compression algorithms based on column families
-        configureCompression(conf, table.getTableDescriptor());
-        configureBloomType(table.getTableDescriptor(), conf);
-        configureBlockSize(table.getTableDescriptor(), conf);
-        HTableDescriptor tableDescriptor = table.getTableDescriptor();
-        configureDataBlockEncoding(tableDescriptor, conf);
+        conf.set(COMPRESSION_FAMILIES_CONF_KEY,
+                serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
+        conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
+                serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
+        conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
+                serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
+        conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+                serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
 
         TableMapReduceUtil.addDependencyJars(job);
         TableMapReduceUtil.initCredentials(job);
-        LOG.info("Incremental table " + table.getName() + " output configured.");
+        LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
     }
 
     /**
@@ -551,9 +696,11 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
      * @return a map from column family to the configured compression algorithm
      */
     @VisibleForTesting
-    static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) {
-        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY);
-        Map<byte[], Algorithm> compressionMap = new TreeMap<byte[], Algorithm>(Bytes.BYTES_COMPARATOR);
+    static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
+                                                                     conf) {
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                COMPRESSION_FAMILIES_CONF_KEY);
+        Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
             Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
             compressionMap.put(e.getKey(), algorithm);
@@ -570,8 +717,9 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
      */
     @VisibleForTesting
     static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
-        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY);
-        Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[], BloomType>(Bytes.BYTES_COMPARATOR);
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                BLOOM_TYPE_FAMILIES_CONF_KEY);
+        Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
             BloomType bloomType = BloomType.valueOf(e.getValue());
             bloomTypeMap.put(e.getKey(), bloomType);
@@ -588,8 +736,9 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
      */
     @VisibleForTesting
     static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
-        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY);
-        Map<byte[], Integer> blockSizeMap = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                BLOCK_SIZE_FAMILIES_CONF_KEY);
+        Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
             Integer blockSize = Integer.parseInt(e.getValue());
             blockSizeMap.put(e.getKey(), blockSize);
@@ -603,27 +752,31 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
      *
      * @param conf to read the serialized values from
      * @return a map from column family to HFileDataBlockEncoder for the
-     *         configured data block type for the family
+     * configured data block type for the family
      */
     @VisibleForTesting
-    static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) {
-        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
-        Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[], DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
+    static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
+            Configuration conf) {
+        Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+                DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
+        Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
             encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
         }
         return encoderMap;
     }
 
+
     /**
      * Run inside the task to deserialize column family to given conf value map.
      *
-     * @param conf to read the serialized values from
+     * @param conf     to read the serialized values from
      * @param confName conf key to read from the configuration
      * @return a map of column family to the given configuration value
      */
-    private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
-        Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
+    private static Map<byte[], String> createFamilyConfValueMap(
+            Configuration conf, String confName) {
+        Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         String confVal = conf.get(confName, "");
         for (String familyConf : confVal.split("&")) {
             String[] familySplit = familyConf.split("=");
@@ -631,7 +784,7 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
                 continue;
             }
             try {
-                confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8),
+                confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
                         URLDecoder.decode(familySplit[1], "UTF-8"));
             } catch (UnsupportedEncodingException e) {
                 // will not happen with UTF-8 encoding
@@ -645,13 +798,18 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
      * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
      * <code>splitPoints</code>. Cleans up the partitions file after job exists.
      */
-    static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException {
+    static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
+            writeMultipleTables)
+            throws IOException {
         Configuration conf = job.getConfiguration();
         // create the partitions file
         FileSystem fs = FileSystem.get(conf);
-        Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + RandomUtil.randomUUID());
+        String hbaseTmpFsDir =
+                conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
+                        HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+        Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + RandomUtil.randomUUID());
         fs.makeQualified(partitionsPath);
-        writePartitions(conf, partitionsPath, splitPoints);
+        writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
         fs.deleteOnExit(partitionsPath);
 
         // configure job to use it
@@ -659,134 +817,134 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
         TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
     }
 
-    /**
-     * Serialize column family to compression algorithm map to configuration.
-     * Invoked while configuring the MR job for incremental load.
-     *
-     * @param table to read the properties from
-     * @param conf to persist serialized values into
-     * @throws IOException
-     *           on failure to read column family descriptors
-     */
     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
     @VisibleForTesting
-    static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor)
+    static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables)
             throws UnsupportedEncodingException {
-        StringBuilder compressionConfigValue = new StringBuilder();
-        if (tableDescriptor == null) {
-            // could happen with mock table instance
-            return;
-        }
-        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+        StringBuilder attributeValue = new StringBuilder();
         int i = 0;
-        for (HColumnDescriptor familyDescriptor : families) {
-            if (i++ > 0) {
-                compressionConfigValue.append('&');
+        for (TableDescriptor tableDescriptor : allTables) {
+            if (tableDescriptor == null) {
+                // could happen with mock table instance
+                // CODEREVIEW: Can I set an empty string in conf if mock table instance?
+                return "";
+            }
+            for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
+                if (i++ > 0) {
+                    attributeValue.append('&');
+                }
+                attributeValue.append(URLEncoder.encode(
+                        Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
+                        "UTF-8"));
+                attributeValue.append('=');
+                attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
             }
-            compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
-            compressionConfigValue.append('=');
-            compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
         }
         // Get rid of the last ampersand
-        conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
+        return attributeValue.toString();
     }
 
     /**
-     * Serialize column family to block size map to configuration.
+     * Serialize column family to compression algorithm map to configuration.
      * Invoked while configuring the MR job for incremental load.
+     *
      * @param tableDescriptor to read the properties from
      * @param conf to persist serialized values into
+     * @throws IOException
+     * on failure to read column family descriptors
+     */
+    @VisibleForTesting
+    static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
+            familyDescriptor.getCompressionType().getName();
+
+    /**
+     * Serialize column family to block size map to configuration. Invoked while
+     * configuring the MR job for incremental load.
      *
+     * @param tableDescriptor
+     * to read the properties from
+     * @param conf
+     * to persist serialized values into
      * @throws IOException
-     *           on failure to read column family descriptors
+     * on failure to read column family descriptors
      */
     @VisibleForTesting
-    static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf)
-            throws UnsupportedEncodingException {
-        StringBuilder blockSizeConfigValue = new StringBuilder();
-        if (tableDescriptor == null) {
-            // could happen with mock table instance
-            return;
-        }
-        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-        int i = 0;
-        for (HColumnDescriptor familyDescriptor : families) {
-            if (i++ > 0) {
-                blockSizeConfigValue.append('&');
-            }
-            blockSizeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
-            blockSizeConfigValue.append('=');
-            blockSizeConfigValue.append(URLEncoder.encode(String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
-        }
-        // Get rid of the last ampersand
-        conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
-    }
+    static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
+            .valueOf(familyDescriptor.getBlocksize());
 
     /**
-     * Serialize column family to bloom type map to configuration.
-     * Invoked while configuring the MR job for incremental load.
-     * @param tableDescriptor to read the properties from
-     * @param conf to persist serialized values into
+     * Serialize column family to bloom type map to configuration. Invoked while
+     * configuring the MR job for incremental load.
      *
+     * @param tableDescriptor
+     * to read the properties from
+     * @param conf
+     * to persist serialized values into
      * @throws IOException
-     *           on failure to read column family descriptors
+     * on failure to read column family descriptors
      */
     @VisibleForTesting
-    static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf)
-            throws UnsupportedEncodingException {
-        if (tableDescriptor == null) {
-            // could happen with mock table instance
-            return;
-        }
-        StringBuilder bloomTypeConfigValue = new StringBuilder();
-        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-        int i = 0;
-        for (HColumnDescriptor familyDescriptor : families) {
-            if (i++ > 0) {
-                bloomTypeConfigValue.append('&');
-            }
-            bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
-            bloomTypeConfigValue.append('=');
-            String bloomType = familyDescriptor.getBloomFilterType().toString();
-            if (bloomType == null) {
-                bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
-            }
-            bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+    static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
+        String bloomType = familyDescriptor.getBloomFilterType().toString();
+        if (bloomType == null) {
+            bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
         }
-        conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
-    }
+        return bloomType;
+    };
 
     /**
      * Serialize column family to data block encoding map to configuration.
      * Invoked while configuring the MR job for incremental load.
      *
-     * @param table to read the properties from
-     * @param conf to persist serialized values into
+     * @param tableDescriptor
+     * to read the properties from
+     * @param conf
+     * to persist serialized values into
      * @throws IOException
-     *           on failure to read column family descriptors
+     * on failure to read column family descriptors
      */
     @VisibleForTesting
-    static void configureDataBlockEncoding(HTableDescriptor tableDescriptor, Configuration conf)
-            throws UnsupportedEncodingException {
-        if (tableDescriptor == null) {
-            // could happen with mock table instance
-            return;
+    static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
+        DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
+        if (encoding == null) {
+            encoding = DataBlockEncoding.NONE;
         }
-        StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
-        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
-        int i = 0;
-        for (HColumnDescriptor familyDescriptor : families) {
-            if (i++ > 0) {
-                dataBlockEncodingConfigValue.append('&');
-            }
-            dataBlockEncodingConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
-            dataBlockEncodingConfigValue.append('=');
-            DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
-            if (encoding == null) {
-                encoding = DataBlockEncoding.NONE;
-            }
-            dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(), "UTF-8"));
+        return encoding.toString();
+    };
+
+    /**
+     * Copy from HBase's org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat,
+     * so that it's protect function can be used.
+     */
+    final private static int validateCompositeKey(byte[] keyBytes) {
+
+        int separatorIdx = Bytes.indexOf(keyBytes, tableSeparator);
+
+        // Either the separator was not found or a tablename wasn't present or a key wasn't present
+        if (separatorIdx == -1) {
+            throw new IllegalArgumentException("Invalid format for composite key [" + Bytes
+                    .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key");
         }
-        conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, dataBlockEncodingConfigValue.toString());
+        return separatorIdx;
     }
-}
+
+    /**
+     * Copy from HBase's org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat,
+     * so that it's protect function can be used.
+     */
+    protected static byte[] getTableName(byte[] keyBytes) {
+        int separatorIdx = validateCompositeKey(keyBytes);
+        return Bytes.copy(keyBytes, 0, separatorIdx);
+    }
+
+    /**
+     * Copy from HBase's org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat,
+     * so that it's protect function can be used.
+     */
+
+    protected static byte[] getSuffix(byte[] keyBytes) {
+        int separatorIdx = validateCompositeKey(keyBytes);
+        return Bytes.copy(keyBytes, separatorIdx + 1, keyBytes.length - separatorIdx - 1);
+    }
+
+}
\ No newline at end of file
diff --git a/stream-receiver/pom.xml b/stream-receiver/pom.xml
index fc0adf1..268e162 100644
--- a/stream-receiver/pom.xml
+++ b/stream-receiver/pom.xml
@@ -217,6 +217,7 @@
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-server</artifactId>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
@@ -224,7 +225,8 @@
         </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
-            <artifactId>jetty-runner</artifactId>
+            <artifactId>jetty-util</artifactId>
+            <scope>compile</scope>
         </dependency>
 
         <dependency>


[kylin] 02/03: KYLIN-4598 Missing dependency when run kylin.sh org.apache.kylin.*

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master-hadoop3
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f2ac53a32ffa3daeebe516b492759cd682ae1bf6
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Sat Jun 27 11:01:31 2020 +0800

    KYLIN-4598 Missing dependency when run kylin.sh
     org.apache.kylin.*
    
    Problems:
    It throws 'NoClassDefFoundError: org/apache/kafka/clients/producer/Producer' when run kylin.sh org.apache.kylin.source.kafka.util.KafkaSampleProducer.
    
    Solutions:
    Add 'export kafka_dependency' before export HBASE_CLASSPATH in kylin.sh
---
 build/bin/kylin.sh | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 59d6de7..d56976b 100755
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -40,10 +40,16 @@ function retrieveDependency() {
         source ${dir}/cached-hive-dependency.sh
         source ${dir}/cached-hbase-dependency.sh
         source ${dir}/cached-hadoop-conf-dir.sh
+        source ${dir}/cached-kafka-dependency.sh
+        source ${dir}/cached-spark-dependency.sh
+        source ${dir}/cached-flink-dependency.sh
     else
         source ${dir}/find-hive-dependency.sh
         source ${dir}/find-hbase-dependency.sh
         source ${dir}/find-hadoop-conf-dir.sh
+        source ${dir}/find-kafka-dependency.sh
+        source ${dir}/find-spark-dependency.sh
+        source ${dir}/find-flink-dependency.sh
     fi
 
     #retrive $KYLIN_EXTRA_START_OPTS