You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/03/07 06:13:34 UTC
[storm] 02/07: STORM-3349: Upgrade Hadoop, Hive, HDFS,
HBase to latest compatible versions
This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
commit 41fe57a9a65f8b0007d327f01442ecb8d0982521
Author: Stig Rohde Døssing <sd...@it-minds.dk>
AuthorDate: Wed Feb 27 13:26:06 2019 +0100
STORM-3349: Upgrade Hadoop, Hive, HDFS, HBase to latest compatible versions
---
.../storm/hbase/topology/WordCountClient.java | 6 ++-
examples/storm-jms-examples/pom.xml | 2 +-
external/storm-autocreds/pom.xml | 16 ++++++
.../org/apache/storm/hbase/common/HBaseClient.java | 14 ++---
.../java/org/apache/storm/hbase/common/Utils.java | 12 +++--
.../storm/hbase/trident/state/HBaseMapState.java | 6 +--
.../hbase/trident/windowing/HBaseWindowsStore.java | 21 ++++----
external/storm-hive/pom.xml | 1 +
.../storm/flux/examples/WordCountClient.java | 6 ++-
pom.xml | 63 ++++++++++++++++++++--
.../jvm/org/apache/storm/utils/JCQueueTest.java | 39 +++++++-------
11 files changed, 133 insertions(+), 53 deletions(-)
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
index b7874bd..33acfab 100644
--- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
@@ -14,9 +14,11 @@ package org.apache.storm.hbase.topology;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -32,7 +34,7 @@ public class WordCountClient {
config.set("hbase.rootdir", args[0]);
}
- HTable table = new HTable(config, "WordCount");
+ Table table = ConnectionFactory.createConnection(config).getTable(TableName.valueOf("WordCount"));
for (String word : WordSpout.words) {
diff --git a/examples/storm-jms-examples/pom.xml b/examples/storm-jms-examples/pom.xml
index 10266c1..886066c 100644
--- a/examples/storm-jms-examples/pom.xml
+++ b/examples/storm-jms-examples/pom.xml
@@ -29,7 +29,7 @@
<artifactId>storm-jms-examples</artifactId>
<properties>
- <spring.version>5.0.4.RELEASE</spring.version>
+ <spring.version>5.1.5.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml
index 6c69740..d819145 100644
--- a/external/storm-autocreds/pom.xml
+++ b/external/storm-autocreds/pom.xml
@@ -108,6 +108,22 @@
</exclusions>
</dependency>
<dependency>
+ <!-- Needed for TokenUtil, which moved here from hbase-client -->
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-streaming</artifactId>
<version>${hive.version}</version>
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
index 208e622..2d0c324 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
@@ -22,13 +22,13 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
import org.apache.storm.hbase.security.HBaseSecurityUtil;
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
public class HBaseClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class);
- private HTable table;
+ private Table table;
public HBaseClient(Map<String, Object> map, final Configuration configuration, final String tableName) {
try {
@@ -60,14 +60,14 @@ public class HBaseClient implements Closeable {
put.setDurability(durability);
for (ColumnList.Column col : cols.getColumns()) {
if (col.getTs() > 0) {
- put.add(
+ put.addColumn(
col.getFamily(),
col.getQualifier(),
col.getTs(),
col.getValue()
);
} else {
- put.add(
+ put.addColumn(
col.getFamily(),
col.getQualifier(),
col.getValue()
@@ -82,9 +82,9 @@ public class HBaseClient implements Closeable {
inc.setDurability(durability);
for (ColumnList.Counter cnt : cols.getCounters()) {
inc.addColumn(
- cnt.getFamily(),
- cnt.getQualifier(),
- cnt.getIncrement()
+ cnt.getFamily(),
+ cnt.getQualifier(),
+ cnt.getIncrement()
);
}
mutations.add(inc);
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
index f9e6e34..defa092 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
@@ -16,7 +16,9 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
@@ -31,7 +33,7 @@ public class Utils {
private Utils() {}
- public static HTable getTable(UserProvider provider, Configuration config, String tableName)
+ public static Table getTable(UserProvider provider, Configuration config, String tableName)
throws IOException, InterruptedException {
UserGroupInformation ugi;
if (provider != null) {
@@ -74,10 +76,10 @@ public class Utils {
}
}
- return ugi.doAs(new PrivilegedExceptionAction<HTable>() {
+ return ugi.doAs(new PrivilegedExceptionAction<Table>() {
@Override
- public HTable run() throws IOException {
- return new HTable(config, tableName);
+ public Table run() throws IOException {
+ return ConnectionFactory.createConnection(config).getTable(TableName.valueOf(tableName));
}
});
}
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
index d2a73d4..63169fb 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
@@ -28,10 +28,10 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.storm.hbase.common.Utils;
import org.apache.storm.hbase.security.HBaseSecurityUtil;
@@ -72,7 +72,7 @@ public class HBaseMapState<T> implements IBackingMap<T> {
private int partitionNum;
private Options<T> options;
private Serializer<T> serializer;
- private HTable table;
+ private Table table;
/**
* Constructor.
@@ -183,7 +183,7 @@ public class HBaseMapState<T> implements IBackingMap<T> {
new Object[]{ this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i))) });
Put put = new Put(hbaseKey);
T val = values.get(i);
- put.add(this.options.columnFamily.getBytes(),
+ put.addColumn(this.options.columnFamily.getBytes(),
qualifier.getBytes(),
this.serializer.serialize(val));
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index 086b477..3fe4bc4 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -23,12 +23,15 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.storm.trident.windowing.WindowKryoSerializer;
import org.apache.storm.trident.windowing.WindowsStore;
@@ -41,9 +44,9 @@ import org.slf4j.LoggerFactory;
public class HBaseWindowsStore implements WindowsStore {
public static final String UTF_8 = "utf-8";
private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class);
- private final ThreadLocal<HTable> threadLocalHtable;
+ private final ThreadLocal<Table> threadLocalHtable;
private final ThreadLocal<WindowKryoSerializer> threadLocalWindowKryoSerializer;
- private final Queue<HTable> htables = new ConcurrentLinkedQueue<>();
+ private final Queue<Table> htables = new ConcurrentLinkedQueue<>();
private final byte[] family;
private final byte[] qualifier;
@@ -52,13 +55,13 @@ public class HBaseWindowsStore implements WindowsStore {
this.family = family;
this.qualifier = qualifier;
- threadLocalHtable = new ThreadLocal<HTable>() {
+ threadLocalHtable = new ThreadLocal<Table>() {
@Override
- protected HTable initialValue() {
+ protected Table initialValue() {
try {
- HTable hTable = new HTable(config, tableName);
- htables.add(hTable);
- return hTable;
+ Table table = ConnectionFactory.createConnection(config).getTable(TableName.valueOf(tableName));
+ htables.add(table);
+ return table;
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -74,7 +77,7 @@ public class HBaseWindowsStore implements WindowsStore {
}
- private HTable htable() {
+ private Table htable() {
return threadLocalHtable.get();
}
@@ -252,7 +255,7 @@ public class HBaseWindowsStore implements WindowsStore {
@Override
public void shutdown() {
// close all the created hTable instances
- for (HTable htable : htables) {
+ for (Table htable : htables) {
try {
htable.close();
} catch (IOException e) {
diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml
index 7723916..2fd8ae5 100644
--- a/external/storm-hive/pom.xml
+++ b/external/storm-hive/pom.xml
@@ -154,6 +154,7 @@
<artifactId>java-hamcrest</artifactId>
</dependency>
<dependency>
+ <!-- Needed in this version by hive -->
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.3</version>
diff --git a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
index 6e732ae..484d3c1 100644
--- a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
+++ b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
@@ -23,9 +23,11 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -61,7 +63,7 @@ public class WordCountClient {
System.exit(1);
}
- HTable table = new HTable(config, "WordCount");
+ Table table = ConnectionFactory.createConnection(config).getTable(TableName.valueOf("WordCount"));
String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
for (String word : words) {
diff --git a/pom.xml b/pom.xml
index b8a8740..f359fc5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -287,10 +287,10 @@
<mockito.version>2.19.0</mockito.version>
<zookeeper.version>3.4.6</zookeeper.version>
<jline.version>0.9.94</jline.version>
- <hive.version>2.3.3</hive.version>
- <hadoop.version>2.6.1</hadoop.version>
+ <hive.version>2.3.4</hive.version>
+ <hadoop.version>2.8.5</hadoop.version>
<hdfs.version>${hadoop.version}</hdfs.version>
- <hbase.version>1.4.4</hbase.version>
+ <hbase.version>2.1.3</hbase.version>
<kryo.version>3.0.3</kryo.version>
<servlet.version>3.1.0</servlet.version>
<joda-time.version>2.3</joda-time.version>
@@ -301,7 +301,6 @@
<hdrhistogram.version>2.1.10</hdrhistogram.version>
<hamcrest.version>2.0.0.0</hamcrest.version>
<cassandra.version>2.1.7</cassandra.version>
- <druid.version>0.8.2</druid.version>
<elasticsearch.version>5.2.2</elasticsearch.version>
<calcite.version>1.14.0</calcite.version>
<mongodb.version>3.2.0</mongodb.version>
@@ -1084,6 +1083,62 @@
</exclusion>
</exclusions>
</dependency>
+ <!-- Hadoop dependencies. Specified here so HDFS/HBase don't import older versions of these jars in their projects-->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-registry</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-archives</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
index aa7714d..896f0c2 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
@@ -20,6 +20,7 @@ import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.policy.WaitStrategyPark;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
public class JCQueueTest {
@@ -31,34 +32,32 @@ public class JCQueueTest {
@Test
public void testFirstMessageFirst() throws InterruptedException {
Assertions.assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
- for (int i = 0; i < 100; i++) {
- JCQueue queue = createQueue("firstMessageOrder", 16);
+ JCQueue queue = createQueue("firstMessageOrder", 16);
- queue.publish("FIRST");
+ queue.publish("FIRST");
- Runnable producer = new IncProducer(queue, i + 100, 1);
+ Runnable producer = new IncProducer(queue, 100, 1);
- final AtomicReference<Object> result = new AtomicReference<>();
- Runnable consumer = new ConsumerThd(queue, new JCQueue.Consumer() {
- private boolean head = true;
+ final AtomicReference<Object> result = new AtomicReference<>();
+ Runnable consumer = new ConsumerThd(queue, new JCQueue.Consumer() {
+ private boolean head = true;
- @Override
- public void accept(Object event) {
- if (head) {
- head = false;
- result.set(event);
- }
+ @Override
+ public void accept(Object event) {
+ if (head) {
+ head = false;
+ result.set(event);
}
+ }
- @Override
- public void flush() {
- }
- });
+ @Override
+ public void flush() {
+ }
+ });
- run(producer, consumer, queue);
- Assert.assertEquals("We expect to receive first published message first, but received " + result.get(),
+ run(producer, consumer, queue);
+ Assert.assertEquals("We expect to receive first published message first, but received " + result.get(),
"FIRST", result.get());
- }
});
}