You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/06/19 11:06:10 UTC
[kylin] 03/10: Fix dependency issues for branch of hadoop3
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master-hadoop3
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit bc02c2052d94033f46c14bad1778948bf6b60f02
Author: nic <ni...@apache.org>
AuthorDate: Fri Jun 19 17:52:33 2020 +0800
Fix dependency issues for branch of hadoop3
---
core-common/pom.xml | 4 +++
.../mr/common/DefaultSslProtocolSocketFactory.java | 0
pom.xml | 7 +++-
.../apache/kylin/rest/job/KylinHealthCheckJob.java | 11 ++++---
.../kylin/rest/job/StorageCleanJobHbaseUtil.java | 2 +-
.../org/apache/kylin/rest/service/CubeService.java | 5 +--
.../kylin/storage/hbase/steps/CreateHTableJob.java | 3 +-
.../kylin/storage/hbase/steps/CubeHFileJob.java | 9 ++---
.../kylin/storage/hbase/steps/CubeHTableUtil.java | 4 ++-
.../kylin/storage/hbase/steps/HBaseFlinkSteps.java | 1 +
.../kylin/storage/hbase/steps/HBaseSparkSteps.java | 1 +
.../storage/hbase/steps/HFileOutputFormat3.java | 38 ++++++++++++----------
.../storage/hbase/util/DeployCoprocessorCLI.java | 8 ++++-
.../kylin/storage/hbase/util/StorageCleanUtil.java | 13 ++++----
stream-receiver/pom.xml | 4 +++
.../kylin/tool/extractor/HBaseUsageExtractor.java | 2 +-
.../migration/CubeMigrationCrossClusterCLI.java | 7 ++--
.../kylin/tool/migration/DstClusterUtil.java | 2 +-
18 files changed, 72 insertions(+), 49 deletions(-)
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 6266558..407ab9c 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -114,5 +114,9 @@
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-shaded-guava</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
deleted file mode 100644
index e69de29..0000000
diff --git a/pom.xml b/pom.xml
index 30a4f8f..017859f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1069,6 +1069,11 @@
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-runner</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
@@ -1243,7 +1248,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
- <version>3.0.0</version>
+ <version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java
index 21fa784..bb3614a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java
@@ -29,8 +29,9 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.BufferedLogger;
@@ -49,6 +50,7 @@ import org.apache.kylin.job.execution.CheckpointExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.model.DataModelManager;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -199,13 +201,14 @@ public class KylinHealthCheckJob extends AbstractApplication {
private void checkHBaseTables(List<CubeInstance> cubes) throws IOException {
reporter.log("## Checking HBase Table of segments");
- HBaseAdmin hbaseAdmin = new HBaseAdmin(HBaseConfiguration.create());
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ Admin hbaseAdmin = conn.getAdmin();
try {
for (CubeInstance cube : cubes) {
for (CubeSegment segment : cube.getSegments()) {
if (segment.getStatus() != SegmentStatusEnum.NEW) {
String tableName = segment.getStorageLocationIdentifier();
- if ((!hbaseAdmin.tableExists(tableName)) || (!hbaseAdmin.isTableEnabled(tableName))) {
+ if ((!hbaseAdmin.tableExists(TableName.valueOf(tableName)) || (!hbaseAdmin.isTableEnabled(TableName.valueOf(tableName))))) {
reporter.log("HBase table: {} not exist for segment: {}, project: {}", tableName, segment,
cube.getProject());
reporter.log(
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
index c2784be..3822397 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
@@ -190,7 +190,7 @@ public class StorageCleanJobHbaseUtil {
} else {
logger.info("HBase table " + htableName + " does not exist");
}
- return htableName;
+ return htableName.getNameAsString();
}
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index f11f31a..50b0f55 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -30,7 +30,7 @@ import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.persistence.RootPersistentEntity;
@@ -647,7 +647,8 @@ public class CubeService extends BasicService implements InitializingBean {
seg.getLastBuildJobID()));
}
- StorageCleanUtil.dropHTables(new HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration()), toDropHTables);
+ Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ StorageCleanUtil.dropHTables(conn.getAdmin(), toDropHTables);
StorageCleanUtil.deleteHDFSPath(HadoopUtil.getWorkingFileSystem(), toDelHDFSPaths);
}
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 271438c..b26f336 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
@@ -136,7 +135,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
HTable htable = (HTable) conn.getTable(TableName.valueOf(hbaseTableName));
- HFileOutputFormat2.configureIncrementalLoadMap(job, htable.getDescriptor());
+ HFileOutputFormat3.configureIncrementalLoadMap(job, htable);
logger.info("Saving HBase configuration to {}", hbaseConfPath);
FileSystem fs = HadoopUtil.getWorkingFileSystem();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index c5f40b4..86189e5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
@@ -56,10 +55,6 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Locale;
-
-import static org.apache.hadoop.hbase.HBaseConfiguration.merge;
-
/**
* @author George Song (ysong1)
*/
@@ -67,6 +62,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
protected static final Logger logger = LoggerFactory.getLogger(CubeHFileJob.class);
+ @Override
public int run(String[] args) throws Exception {
Options options = new Options();
@@ -114,7 +110,8 @@ public class CubeHFileJob extends AbstractHadoopJob {
Table table = connection.getTable(TableName.valueOf(hTableName));
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(hTableName));
// Automatic config !
- HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
+ HFileOutputFormat3.configureIncrementalLoad(job, table, regionLocator);
+ HFileOutputFormat3.configureHConnection(job, hbaseConf, getJobTempDir());
reconfigurePartitions(hbaseConf, partitionFilePath);
job.setInputFormatClass(SequenceFileInputFormat.class);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
index 2e0f60a..fd682a8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -25,6 +25,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -125,7 +127,7 @@ public class CubeHTableUtil {
DeployCoprocessorCLI.deployCoprocessor(descBuilder);
admin.createTable(descBuilder.build(), splitKeys);
- Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons");
+ Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
logger.info("create hbase table " + tableName + " done.");
} finally {
IOUtils.closeQuietly(admin);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java
index ddb2655..727cce3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java
@@ -34,6 +34,7 @@ public class HBaseFlinkSteps extends HBaseJobSteps {
super(seg);
}
+ @Override
public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
String cuboidRootPath = getCuboidRootPath(jobId);
String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/");
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index d636e7d..c593f2a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -35,6 +35,7 @@ public class HBaseSparkSteps extends HBaseJobSteps {
super(seg);
}
+ @Override
public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
String cuboidRootPath = getCuboidRootPath(jobId);
String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/");
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
index 7c83b3c..2f139b5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -47,24 +48,24 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
@@ -74,7 +75,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TextSortReducer;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -150,7 +151,7 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
// Invented config. Add to hbase-*.xml if other than default compression.
final String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
- final Algorithm defaultCompression = AbstractHFileWriter.compressionByName(defaultCompressionStr);
+ final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
final boolean compactionExclude = conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
false);
@@ -281,12 +282,13 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
HFileContext hFileContext = contextBuilder.build();
if (null == favoredNodes) {
- wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
- .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
- .withFileContext(hFileContext).build();
+ StoreFileWriter.Builder writerBuilder = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs);
+ wl.writer = writerBuilder.withOutputDir(familydir).withBloomType(bloomType)
+ .withComparator(new CellComparatorImpl.MetaCellComparator()).withFileContext(hFileContext).build();
} else {
- wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
- .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
+ StoreFileWriter.Builder writerBuilder = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs);
+ wl.writer = writerBuilder.withOutputDir(familydir).withBloomType(bloomType)
+ .withComparator(new CellComparatorImpl.MetaCellComparator())
.withFileContext(hFileContext).withFavoredNodes(favoredNodes).build();
}
@@ -294,12 +296,12 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
return wl;
}
- private void close(final StoreFile.Writer w) throws IOException {
+ private void close(final StoreFileWriter w) throws IOException {
if (w != null) {
- w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
- w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
- w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
- w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
+ w.appendFileInfo(HStoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
+ w.appendFileInfo(HStoreFile.BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
+ w.appendFileInfo(HStoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
+ w.appendFileInfo(HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
w.appendTrackedTimestampsToMetadata();
w.close();
}
@@ -319,7 +321,7 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
*/
static class WriterLength {
long written = 0;
- StoreFile.Writer writer = null;
+ StoreFileWriter writer = null;
}
/**
@@ -553,7 +555,7 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable,
Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY);
Map<byte[], Algorithm> compressionMap = new TreeMap<byte[], Algorithm>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
- Algorithm algorithm = AbstractHFileWriter.compressionByName(e.getValue());
+ Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
compressionMap.put(e.getKey(), algorithm);
}
return compressionMap;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 959b61b..3d08168 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
@@ -203,7 +204,7 @@ public class DeployCoprocessorCLI {
skipTableCnt ++;
continue;
}
- HTableDescriptor tableDesc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+ tableDesc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
String gitTag = tableDesc.getValue(IRealizationConstants.HTableGitTag);
if (commitInfo.equals(gitTag)) {
filteredList.add(tableName);
@@ -302,6 +303,11 @@ public class DeployCoprocessorCLI {
descBuilder.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
}
+ public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
+ logger.info("Add coprocessor on " + desc.getNameAsString());
+ desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
+ }
+
public static boolean resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar)
throws IOException {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
index a1259b8..0cae1e7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
@@ -21,7 +21,8 @@ package org.apache.kylin.storage.hbase.util;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,18 +36,18 @@ public class StorageCleanUtil {
/**
* this method will close hbaseAdmin after finishing the work.
*/
- public static void dropHTables(final HBaseAdmin hbaseAdmin, List<String> hTables) {
+ public static void dropHTables(final Admin hbaseAdmin, List<String> hTables) {
runSingleThreadTaskQuietly(() -> {
try {
for (String htable : hTables) {
logger.info("Deleting HBase table {}", htable);
- if (hbaseAdmin.tableExists(htable)) {
- if (hbaseAdmin.isTableEnabled(htable)) {
- hbaseAdmin.disableTable(htable);
+ if (hbaseAdmin.tableExists(TableName.valueOf(htable))) {
+ if (hbaseAdmin.isTableEnabled(TableName.valueOf(htable))) {
+ hbaseAdmin.disableTable(TableName.valueOf(htable));
}
- hbaseAdmin.deleteTable(htable);
+ hbaseAdmin.deleteTable(TableName.valueOf(htable));
logger.info("Deleted HBase table {}", htable);
} else {
logger.info("HBase table {} does not exist.", htable);
diff --git a/stream-receiver/pom.xml b/stream-receiver/pom.xml
index 411e750..fc0adf1 100644
--- a/stream-receiver/pom.xml
+++ b/stream-receiver/pom.xml
@@ -222,6 +222,10 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-runner</artifactId>
+ </dependency>
<dependency>
<groupId>junit</groupId>
diff --git a/tool/src/main/java/org/apache/kylin/tool/extractor/HBaseUsageExtractor.java b/tool/src/main/java/org/apache/kylin/tool/extractor/HBaseUsageExtractor.java
index 3f4cf27..e8cf549 100644
--- a/tool/src/main/java/org/apache/kylin/tool/extractor/HBaseUsageExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/extractor/HBaseUsageExtractor.java
@@ -90,7 +90,7 @@ public class HBaseUsageExtractor extends AbstractInfoExtractor {
extractor.execute(args);
}
- private String getHBaseMasterUrl() throws IOException, KeeperException {
+ private String getHBaseMasterUrl() {
String host = conf.get("hbase.master.info.bindAddress");
if (host.equals("0.0.0.0")) {
try {
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java b/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java
index 95efab0..1932108 100644
--- a/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions;
-import org.apache.hadoop.tools.OptionsParser;
import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.JsonUtil;
@@ -693,9 +692,8 @@ public class CubeMigrationCrossClusterCLI extends AbstractApplication {
protected void copyHDFSPath(String srcDir, Configuration srcConf, String dstDir, Configuration dstConf)
throws Exception {
logger.info("start to copy hdfs directory from {} to {}", srcDir, dstDir);
- DistCpOptions distCpOptions = OptionsParser.parse(new String[] { srcDir, dstDir });
- distCpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
- distCpOptions.setBlocking(true);
+ DistCpOptions.Builder builder = new DistCpOptions.Builder(new Path(srcDir), new Path(dstDir));
+ DistCpOptions distCpOptions = builder.preserve(DistCpOptions.FileAttribute.BLOCKSIZE).withBlocking(true).build();
setTargetPathExists(distCpOptions);
DistCp distCp = new DistCp(getConfOfDistCp(), distCpOptions);
distCp.execute();
@@ -714,7 +712,6 @@ public class CubeMigrationCrossClusterCLI extends AbstractApplication {
Path target = inputOptions.getTargetPath();
FileSystem targetFS = target.getFileSystem(dstCluster.jobConf);
boolean targetExists = targetFS.exists(target);
- inputOptions.setTargetPathExists(targetExists);
dstCluster.jobConf.setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, targetExists);
}
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java b/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java
index e578935..e172a48 100644
--- a/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java
@@ -297,7 +297,7 @@ public class DstClusterUtil extends ClusterUtil {
}
public void deployCoprocessor(HTableDescriptor tableDesc, String localCoprocessorJar) throws IOException {
- List<String> existingCoprocessors = tableDesc.getCoprocessors();
+ List<String> existingCoprocessors = (List<String>) tableDesc.getCoprocessors();
for (String existingCoprocessor : existingCoprocessors) {
tableDesc.removeCoprocessor(existingCoprocessor);
}