You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/06/19 11:06:10 UTC

[kylin] 03/10: Fix dependency issues for branch of hadoop3

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master-hadoop3
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit bc02c2052d94033f46c14bad1778948bf6b60f02
Author: nic <ni...@apache.org>
AuthorDate: Fri Jun 19 17:52:33 2020 +0800

    Fix dependency issues for branch of hadoop3
---
 core-common/pom.xml                                |  4 +++
 .../mr/common/DefaultSslProtocolSocketFactory.java |  0
 pom.xml                                            |  7 +++-
 .../apache/kylin/rest/job/KylinHealthCheckJob.java | 11 ++++---
 .../kylin/rest/job/StorageCleanJobHbaseUtil.java   |  2 +-
 .../org/apache/kylin/rest/service/CubeService.java |  5 +--
 .../kylin/storage/hbase/steps/CreateHTableJob.java |  3 +-
 .../kylin/storage/hbase/steps/CubeHFileJob.java    |  9 ++---
 .../kylin/storage/hbase/steps/CubeHTableUtil.java  |  4 ++-
 .../kylin/storage/hbase/steps/HBaseFlinkSteps.java |  1 +
 .../kylin/storage/hbase/steps/HBaseSparkSteps.java |  1 +
 .../storage/hbase/steps/HFileOutputFormat3.java    | 38 ++++++++++++----------
 .../storage/hbase/util/DeployCoprocessorCLI.java   |  8 ++++-
 .../kylin/storage/hbase/util/StorageCleanUtil.java | 13 ++++----
 stream-receiver/pom.xml                            |  4 +++
 .../kylin/tool/extractor/HBaseUsageExtractor.java  |  2 +-
 .../migration/CubeMigrationCrossClusterCLI.java    |  7 ++--
 .../kylin/tool/migration/DstClusterUtil.java       |  2 +-
 18 files changed, 72 insertions(+), 49 deletions(-)

diff --git a/core-common/pom.xml b/core-common/pom.xml
index 6266558..407ab9c 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -114,5 +114,9 @@
             <artifactId>curator-test</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-shaded-guava</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
deleted file mode 100644
index e69de29..0000000
diff --git a/pom.xml b/pom.xml
index 30a4f8f..017859f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1069,6 +1069,11 @@
             </dependency>
             <dependency>
                 <groupId>org.eclipse.jetty</groupId>
+                <artifactId>jetty-runner</artifactId>
+                <version>${jetty.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.eclipse.jetty</groupId>
                 <artifactId>jetty-util</artifactId>
                 <version>${jetty.version}</version>
                 <scope>test</scope>
@@ -1243,7 +1248,7 @@
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-shade-plugin</artifactId>
-                    <version>3.0.0</version>
+                    <version>3.1.0</version>
                 </plugin>
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java
index 21fa784..bb3614a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java
@@ -29,8 +29,9 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.BufferedLogger;
@@ -49,6 +50,7 @@ import org.apache.kylin.job.execution.CheckpointExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -199,13 +201,14 @@ public class KylinHealthCheckJob extends AbstractApplication {
 
     private void checkHBaseTables(List<CubeInstance> cubes) throws IOException {
         reporter.log("## Checking HBase Table of segments");
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(HBaseConfiguration.create());
+        Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+        Admin hbaseAdmin = conn.getAdmin();
         try {
             for (CubeInstance cube : cubes) {
                 for (CubeSegment segment : cube.getSegments()) {
                     if (segment.getStatus() != SegmentStatusEnum.NEW) {
                         String tableName = segment.getStorageLocationIdentifier();
-                        if ((!hbaseAdmin.tableExists(tableName)) || (!hbaseAdmin.isTableEnabled(tableName))) {
+                        if ((!hbaseAdmin.tableExists(TableName.valueOf(tableName)) || (!hbaseAdmin.isTableEnabled(TableName.valueOf(tableName))))) {
                             reporter.log("HBase table: {} not exist for segment: {}, project: {}", tableName, segment,
                                     cube.getProject());
                             reporter.log(
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
index c2784be..3822397 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
@@ -190,7 +190,7 @@ public class StorageCleanJobHbaseUtil {
             } else {
                 logger.info("HBase table " + htableName + " does not exist");
             }
-            return htableName;
+            return htableName.getNameAsString();
         }
     }
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index f11f31a..50b0f55 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -30,7 +30,7 @@ import java.util.Set;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
@@ -647,7 +647,8 @@ public class CubeService extends BasicService implements InitializingBean {
                         seg.getLastBuildJobID()));
             }
 
-            StorageCleanUtil.dropHTables(new HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration()), toDropHTables);
+            Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+            StorageCleanUtil.dropHTables(conn.getAdmin(), toDropHTables);
             StorageCleanUtil.deleteHDFSPath(HadoopUtil.getWorkingFileSystem(), toDelHDFSPaths);
         }
     }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 271438c..b26f336 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Job;
@@ -136,7 +135,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
         HTable htable = (HTable) conn.getTable(TableName.valueOf(hbaseTableName));
 
-        HFileOutputFormat2.configureIncrementalLoadMap(job, htable.getDescriptor());
+        HFileOutputFormat3.configureIncrementalLoadMap(job, htable);
 
         logger.info("Saving HBase configuration to {}", hbaseConfPath);
         FileSystem fs = HadoopUtil.getWorkingFileSystem();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index c5f40b4..86189e5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
@@ -56,10 +55,6 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Locale;
-
-import static org.apache.hadoop.hbase.HBaseConfiguration.merge;
-
 /**
  * @author George Song (ysong1)
  */
@@ -67,6 +62,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
     protected static final Logger logger = LoggerFactory.getLogger(CubeHFileJob.class);
 
+    @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
 
@@ -114,7 +110,8 @@ public class CubeHFileJob extends AbstractHadoopJob {
             Table table = connection.getTable(TableName.valueOf(hTableName));
             RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(hTableName));
             // Automatic config !
-            HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
+            HFileOutputFormat3.configureIncrementalLoad(job, table, regionLocator);
+            HFileOutputFormat3.configureHConnection(job, hbaseConf, getJobTempDir());
             reconfigurePartitions(hbaseConf, partitionFilePath);
 
             job.setInputFormatClass(SequenceFileInputFormat.class);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
index 2e0f60a..fd682a8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -25,6 +25,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -125,7 +127,7 @@ public class CubeHTableUtil {
             DeployCoprocessorCLI.deployCoprocessor(descBuilder);
 
             admin.createTable(descBuilder.build(), splitKeys);
-            Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons");
+            Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
             logger.info("create hbase table " + tableName + " done.");
         } finally {
             IOUtils.closeQuietly(admin);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java
index ddb2655..727cce3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java
@@ -34,6 +34,7 @@ public class HBaseFlinkSteps extends HBaseJobSteps {
         super(seg);
     }
 
+    @Override
     public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
         String cuboidRootPath = getCuboidRootPath(jobId);
         String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/");
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index d636e7d..c593f2a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -35,6 +35,7 @@ public class HBaseSparkSteps extends HBaseJobSteps {
         super(seg);
     }
 
+    @Override
     public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
         String cuboidRootPath = getCuboidRootPath(jobId);
         String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/");
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
index 7c83b3c..2f139b5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -47,24 +48,24 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
 import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
@@ -74,7 +75,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TextSortReducer;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -150,7 +151,7 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
         final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
         // Invented config.  Add to hbase-*.xml if other than default compression.
         final String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
-        final Algorithm defaultCompression = AbstractHFileWriter.compressionByName(defaultCompressionStr);
+        final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
         final boolean compactionExclude = conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
                 false);
 
@@ -281,12 +282,13 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
                 HFileContext hFileContext = contextBuilder.build();
 
                 if (null == favoredNodes) {
-                    wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
-                            .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
-                            .withFileContext(hFileContext).build();
+                    StoreFileWriter.Builder writerBuilder = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs);
+                    wl.writer = writerBuilder.withOutputDir(familydir).withBloomType(bloomType)
+                            .withComparator(new CellComparatorImpl.MetaCellComparator()).withFileContext(hFileContext).build();
                 } else {
-                    wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
-                            .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
+                    StoreFileWriter.Builder writerBuilder = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs);
+                    wl.writer = writerBuilder.withOutputDir(familydir).withBloomType(bloomType)
+                            .withComparator(new CellComparatorImpl.MetaCellComparator())
                             .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build();
                 }
 
@@ -294,12 +296,12 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
                 return wl;
             }
 
-            private void close(final StoreFile.Writer w) throws IOException {
+            private void close(final StoreFileWriter w) throws IOException {
                 if (w != null) {
-                    w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
-                    w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
-                    w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
-                    w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
+                    w.appendFileInfo(HStoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
+                    w.appendFileInfo(HStoreFile.BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
+                    w.appendFileInfo(HStoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
+                    w.appendFileInfo(HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
                     w.appendTrackedTimestampsToMetadata();
                     w.close();
                 }
@@ -319,7 +321,7 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
      */
     static class WriterLength {
         long written = 0;
-        StoreFile.Writer writer = null;
+        StoreFileWriter writer = null;
     }
 
     /**
@@ -553,7 +555,7 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
         Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY);
         Map<byte[], Algorithm> compressionMap = new TreeMap<byte[], Algorithm>(Bytes.BYTES_COMPARATOR);
         for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
-            Algorithm algorithm = AbstractHFileWriter.compressionByName(e.getValue());
+            Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
             compressionMap.put(e.getKey(), algorithm);
         }
         return compressionMap;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 959b61b..3d08168 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
@@ -203,7 +204,7 @@ public class DeployCoprocessorCLI {
                 skipTableCnt ++;
                 continue;
             }
-            HTableDescriptor tableDesc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            tableDesc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
             String gitTag = tableDesc.getValue(IRealizationConstants.HTableGitTag);
             if (commitInfo.equals(gitTag)) {
                 filteredList.add(tableName);
@@ -302,6 +303,11 @@ public class DeployCoprocessorCLI {
         descBuilder.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
     }
 
+    public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
+        logger.info("Add coprocessor on " + desc.getNameAsString());
+        desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
+    }
+
     public static boolean resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar)
             throws IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
index a1259b8..0cae1e7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
@@ -21,7 +21,8 @@ package org.apache.kylin.storage.hbase.util;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,18 +36,18 @@ public class StorageCleanUtil {
     /**
      * this method will close hbaseAdmin after finishing the work.
      */
-    public static void dropHTables(final HBaseAdmin hbaseAdmin, List<String> hTables) {
+    public static void dropHTables(final Admin hbaseAdmin, List<String> hTables) {
         runSingleThreadTaskQuietly(() -> {
             try {
                 for (String htable : hTables) {
                     logger.info("Deleting HBase table {}", htable);
 
-                    if (hbaseAdmin.tableExists(htable)) {
-                        if (hbaseAdmin.isTableEnabled(htable)) {
-                            hbaseAdmin.disableTable(htable);
+                    if (hbaseAdmin.tableExists(TableName.valueOf(htable))) {
+                        if (hbaseAdmin.isTableEnabled(TableName.valueOf(htable))) {
+                            hbaseAdmin.disableTable(TableName.valueOf(htable));
                         }
 
-                        hbaseAdmin.deleteTable(htable);
+                        hbaseAdmin.deleteTable(TableName.valueOf(htable));
                         logger.info("Deleted HBase table {}", htable);
                     } else {
                         logger.info("HBase table {} does not exist.", htable);
diff --git a/stream-receiver/pom.xml b/stream-receiver/pom.xml
index 411e750..fc0adf1 100644
--- a/stream-receiver/pom.xml
+++ b/stream-receiver/pom.xml
@@ -222,6 +222,10 @@
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-servlet</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-runner</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>junit</groupId>
diff --git a/tool/src/main/java/org/apache/kylin/tool/extractor/HBaseUsageExtractor.java b/tool/src/main/java/org/apache/kylin/tool/extractor/HBaseUsageExtractor.java
index 3f4cf27..e8cf549 100644
--- a/tool/src/main/java/org/apache/kylin/tool/extractor/HBaseUsageExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/extractor/HBaseUsageExtractor.java
@@ -90,7 +90,7 @@ public class HBaseUsageExtractor extends AbstractInfoExtractor {
         extractor.execute(args);
     }
 
-    private String getHBaseMasterUrl() throws IOException, KeeperException {
+    private String getHBaseMasterUrl() {
         String host = conf.get("hbase.master.info.bindAddress");
         if (host.equals("0.0.0.0")) {
             try {
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java b/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java
index 95efab0..1932108 100644
--- a/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.tools.DistCp;
 import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.DistCpOptions;
-import org.apache.hadoop.tools.OptionsParser;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.JsonUtil;
@@ -693,9 +692,8 @@ public class CubeMigrationCrossClusterCLI extends AbstractApplication {
     protected void copyHDFSPath(String srcDir, Configuration srcConf, String dstDir, Configuration dstConf)
             throws Exception {
         logger.info("start to copy hdfs directory from {} to {}", srcDir, dstDir);
-        DistCpOptions distCpOptions = OptionsParser.parse(new String[] { srcDir, dstDir });
-        distCpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
-        distCpOptions.setBlocking(true);
+        DistCpOptions.Builder builder = new DistCpOptions.Builder(new Path(srcDir), new Path(dstDir));
+        DistCpOptions distCpOptions = builder.preserve(DistCpOptions.FileAttribute.BLOCKSIZE).withBlocking(true).build();
         setTargetPathExists(distCpOptions);
         DistCp distCp = new DistCp(getConfOfDistCp(), distCpOptions);
         distCp.execute();
@@ -714,7 +712,6 @@ public class CubeMigrationCrossClusterCLI extends AbstractApplication {
         Path target = inputOptions.getTargetPath();
         FileSystem targetFS = target.getFileSystem(dstCluster.jobConf);
         boolean targetExists = targetFS.exists(target);
-        inputOptions.setTargetPathExists(targetExists);
         dstCluster.jobConf.setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, targetExists);
     }
 
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java b/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java
index e578935..e172a48 100644
--- a/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java
@@ -297,7 +297,7 @@ public class DstClusterUtil extends ClusterUtil {
     }
 
     public void deployCoprocessor(HTableDescriptor tableDesc, String localCoprocessorJar) throws IOException {
-        List<String> existingCoprocessors = tableDesc.getCoprocessors();
+        List<String> existingCoprocessors = (List<String>) tableDesc.getCoprocessors();
         for (String existingCoprocessor : existingCoprocessors) {
             tableDesc.removeCoprocessor(existingCoprocessor);
         }