You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/18 11:52:35 UTC

[kylin] 05/07: KYLIN-4414 bulkload needs to follow locality

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 8d2a53fcd7711a843b3f34eae5953a8bfff3d3be
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue Mar 10 15:29:11 2020 +0800

    KYLIN-4414 bulkload needs to follow locality
---
 .../kylin/storage/hbase/steps/CubeHFileJob.java    |  15 ++-
 .../storage/hbase/steps/HFileOutputFormat3.java    | 126 ++++++++++++++++++++-
 2 files changed, 131 insertions(+), 10 deletions(-)

diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index c0fae42..e403c20 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -18,8 +18,11 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
+import static org.apache.hadoop.hbase.HBaseConfiguration.merge;
+
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Locale;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
@@ -47,8 +50,6 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.hbase.HBaseConfiguration.merge;
-
 /**
  * @author George Song (ysong1)
  */
@@ -77,7 +78,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
             CubeInstance cube = cubeMgr.getCube(cubeName);
 
-            // use current hbase configuration
+            // construct configuration for the MR job cluster
             Configuration configuration = new Configuration(HBaseConnection.getCurrentHBaseConfiguration());
             String[] allServices = getAllServices(configuration);
             merge(configuration, getConf());
@@ -95,10 +96,14 @@ public class CubeHFileJob extends AbstractHadoopJob {
             // add metadata to distributed cache
             attachCubeMetadata(cube, job.getConfiguration());
 
-            HTable htable = new HTable(configuration, getOptionValue(OPTION_HTABLE_NAME));
+            // construct configuration for the HBase cluster
+            Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
+            HTable htable = new HTable(hbaseConf, getOptionValue(OPTION_HTABLE_NAME).toUpperCase(Locale.ROOT));
 
             // Automatic config !
             HFileOutputFormat3.configureIncrementalLoad(job, htable);
+            HFileOutputFormat3.configureHConnection(job, hbaseConf, getJobTempDir());
+
             reconfigurePartitions(configuration, partitionFilePath);
 
             job.setInputFormatClass(SequenceFileInputFormat.class);
@@ -109,7 +114,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
             job.setSortComparatorClass(RowKeyWritable.RowKeyComparator.class);
 
             // set block replication to 3 for hfiles
-            configuration.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
+            job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
 
             this.deletePath(job.getConfiguration(), output);
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
index 12c30ea..e14d012 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
@@ -17,8 +17,11 @@
 */
 package org.apache.kylin.storage.hbase.steps;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
@@ -39,15 +42,20 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -82,6 +90,7 @@ import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.kylin.common.util.RandomUtil;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
 
 /**
  * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, with fix attempt on KYLIN-2788
@@ -114,6 +123,15 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
     // override the auto-detection of datablock encoding.
     public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding";
 
+    /**
+     * Keep locality while generating HFiles for bulkload. See HBASE-12596
+     */
+    public static final String LOCALITY_SENSITIVE_CONF_KEY = "hbase.bulkload.locality.sensitive.enabled";
+    private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
+    private static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name";
+
+    private static final String BULKLOAD_HCONNECTION_CONF_KEY = "hbase.bulkload.hconnection.configuration";
+
     @Override
     public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(final TaskAttemptContext context)
             throws IOException, InterruptedException {
@@ -150,6 +168,8 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
             overriddenEncoding = null;
         }
 
+        final Configuration hConnectionConf = getConfigureHConnection(conf);
+        
         return new RecordWriter<ImmutableBytesWritable, V>() {
             // Map of families to writers and how much has been output on the writer.
             private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR);
@@ -178,7 +198,47 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
                     rollWriters();
                 }
                 if (wl == null || wl.writer == null) {
-                    wl = getNewWriter(family, conf);
+                    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+                        HRegionLocation loc = null;
+                        String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+                        if (tableName != null) {
+                            try (Connection connection = ConnectionFactory.createConnection(hConnectionConf);
+                                    RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
+                                loc = locator.getRegionLocation(rowKey);
+                            } catch (Throwable e) {
+                                LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey), e);
+                                loc = null;
+                            }
+                        }
+
+                        if (null == loc) {
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("failed to get region location, so use default writer: " +
+                                        Bytes.toString(rowKey));
+                            }
+                            wl = getNewWriter(family, conf, null);
+                        } else {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
+                            }
+                            InetSocketAddress initialIsa =
+                                    new InetSocketAddress(loc.getHostname(), loc.getPort());
+                            if (initialIsa.isUnresolved()) {
+                                if (LOG.isTraceEnabled()) {
+                                    LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
+                                            + loc.getPort() + ", so use default writer");
+                                }
+                                wl = getNewWriter(family, conf, null);
+                            } else {
+                                if(LOG.isDebugEnabled()) {
+                                    LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
+                                }
+                                wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
+                            }
+                        }
+                    } else {
+                        wl = getNewWriter(family, conf, null);
+                    }
                 }
                 kv.updateLatestStamp(this.now);
                 wl.writer.append(kv);
@@ -199,7 +259,8 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
             }
 
             @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", justification = "Not important")
-            private WriterLength getNewWriter(byte[] family, Configuration conf) throws IOException {
+            private WriterLength getNewWriter(byte[] family, Configuration conf, InetSocketAddress[] favoredNodes)
+                    throws IOException {
                 WriterLength wl = new WriterLength();
                 Path familydir = new Path(outputdir, Bytes.toString(family));
                 Algorithm compression = compressionMap.get(family);
@@ -219,9 +280,15 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
                 contextBuilder.withDataBlockEncoding(encoding);
                 HFileContext hFileContext = contextBuilder.build();
 
-                wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs).withOutputDir(familydir)
-                        .withBloomType(bloomType).withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext)
-                        .build();
+                if (null == favoredNodes) {
+                    wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
+                            .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
+                            .withFileContext(hFileContext).build();
+                } else {
+                    wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+                            .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
+                            .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build();
+                }
 
                 this.writers.put(family, wl);
                 return wl;
@@ -307,6 +374,49 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
         }
     }
 
+    public static File configureHConnection(Job job, Configuration hConnectionConf, File tempDir) throws IOException {
+        File tempFile = new File(tempDir, "HConfiguration-" + System.currentTimeMillis() + ".xml");
+        tempFile.deleteOnExit();
+
+        FileOutputStream os = new FileOutputStream(tempFile);
+        hConnectionConf.writeXml(os);
+        os.close();
+
+        String tmpFiles = job.getConfiguration().get("tmpfiles", null);
+        if (tmpFiles == null) {
+            tmpFiles = fixWindowsPath("file://" + tempFile.getAbsolutePath());
+        } else {
+            tmpFiles += "," + fixWindowsPath("file://" + tempFile.getAbsolutePath());
+        }
+        job.getConfiguration().set("tmpfiles", tmpFiles);
+        LOG.info("A temporary file " + tempFile.getAbsolutePath()
+                + " is created for storing hconnection related configuration!!!");
+
+        job.getConfiguration().set(BULKLOAD_HCONNECTION_CONF_KEY, tempFile.getName());
+        return tempFile;
+    }
+
+    public static Configuration getConfigureHConnection(Configuration jobConf) {
+        if (Strings.isNullOrEmpty(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY))) {
+            return jobConf;
+        }
+        File tempFile = new File(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY));
+        Configuration hConnectionConf = new Configuration(false);
+        hConnectionConf.addResource(new Path(tempFile.toURI()));
+        return hConnectionConf;
+    }
+
+    public static String fixWindowsPath(String path) {
+        // fix windows path
+        if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
+            path = path.replace("file://", "file:///");
+        }
+        if (path.startsWith("file:///")) {
+            path = path.replace('\\', '/');
+        }
+        return path;
+    }
+
     /**
      * Configure a MapReduce Job to perform an incremental load into the given
      * table. This
@@ -388,6 +498,12 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
         conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(),
                 ResultSerialization.class.getName(), KeyValueSerialization.class.getName());
 
+        if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+            // record this table name for creating writer by favored nodes
+            LOG.info("bulkload locality sensitive enabled");
+            conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
+        }
+        
         // Use table's region boundaries for TOP split points.
         LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
         List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);