You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/03/07 06:13:34 UTC

[storm] 02/07: STORM-3349: Upgrade Hadoop, Hive, HDFS, HBase to latest compatible versions

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git

commit 41fe57a9a65f8b0007d327f01442ecb8d0982521
Author: Stig Rohde Døssing <sd...@it-minds.dk>
AuthorDate: Wed Feb 27 13:26:06 2019 +0100

    STORM-3349: Upgrade Hadoop, Hive, HDFS, HBase to latest compatible versions
---
 .../storm/hbase/topology/WordCountClient.java      |  6 ++-
 examples/storm-jms-examples/pom.xml                |  2 +-
 external/storm-autocreds/pom.xml                   | 16 ++++++
 .../org/apache/storm/hbase/common/HBaseClient.java | 14 ++---
 .../java/org/apache/storm/hbase/common/Utils.java  | 12 +++--
 .../storm/hbase/trident/state/HBaseMapState.java   |  6 +--
 .../hbase/trident/windowing/HBaseWindowsStore.java | 21 ++++----
 external/storm-hive/pom.xml                        |  1 +
 .../storm/flux/examples/WordCountClient.java       |  6 ++-
 pom.xml                                            | 63 ++++++++++++++++++++--
 .../jvm/org/apache/storm/utils/JCQueueTest.java    | 39 +++++++-------
 11 files changed, 133 insertions(+), 53 deletions(-)

diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
index b7874bd..33acfab 100644
--- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java
@@ -14,9 +14,11 @@ package org.apache.storm.hbase.topology;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -32,7 +34,7 @@ public class WordCountClient {
             config.set("hbase.rootdir", args[0]);
         }
 
-        HTable table = new HTable(config, "WordCount");
+        Table table = ConnectionFactory.createConnection(config).getTable(TableName.valueOf("WordCount"));
 
 
         for (String word : WordSpout.words) {
diff --git a/examples/storm-jms-examples/pom.xml b/examples/storm-jms-examples/pom.xml
index 10266c1..886066c 100644
--- a/examples/storm-jms-examples/pom.xml
+++ b/examples/storm-jms-examples/pom.xml
@@ -29,7 +29,7 @@
     <artifactId>storm-jms-examples</artifactId>
 
     <properties>
-        <spring.version>5.0.4.RELEASE</spring.version>
+        <spring.version>5.1.5.RELEASE</spring.version>
     </properties>
     <dependencies>
         <dependency>
diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml
index 6c69740..d819145 100644
--- a/external/storm-autocreds/pom.xml
+++ b/external/storm-autocreds/pom.xml
@@ -108,6 +108,22 @@
             </exclusions>
         </dependency>
         <dependency>
+            <!-- Needed for TokenUtil, which moved here from hbase-client -->
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <version>${hbase.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hive.hcatalog</groupId>
             <artifactId>hive-hcatalog-streaming</artifactId>
             <version>${hive.version}</version>
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
index 208e622..2d0c324 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
@@ -22,13 +22,13 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
 import org.apache.storm.hbase.security.HBaseSecurityUtil;
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
 public class HBaseClient implements Closeable {
     private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class);
 
-    private HTable table;
+    private Table table;
 
     public HBaseClient(Map<String, Object> map, final Configuration configuration, final String tableName) {
         try {
@@ -60,14 +60,14 @@ public class HBaseClient implements Closeable {
             put.setDurability(durability);
             for (ColumnList.Column col : cols.getColumns()) {
                 if (col.getTs() > 0) {
-                    put.add(
+                    put.addColumn(
                         col.getFamily(),
                         col.getQualifier(),
                         col.getTs(),
                         col.getValue()
                     );
                 } else {
-                    put.add(
+                    put.addColumn(
                         col.getFamily(),
                         col.getQualifier(),
                         col.getValue()
@@ -82,9 +82,9 @@ public class HBaseClient implements Closeable {
             inc.setDurability(durability);
             for (ColumnList.Counter cnt : cols.getCounters()) {
                 inc.addColumn(
-                    cnt.getFamily(),
-                    cnt.getQualifier(),
-                    cnt.getIncrement()
+                        cnt.getFamily(),
+                        cnt.getQualifier(),
+                        cnt.getIncrement()
                 );
             }
             mutations.add(inc);
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
index f9e6e34..defa092 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
@@ -16,7 +16,9 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.security.PrivilegedExceptionAction;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -31,7 +33,7 @@ public class Utils {
 
     private Utils() {}
 
-    public static HTable getTable(UserProvider provider, Configuration config, String tableName)
+    public static Table getTable(UserProvider provider, Configuration config, String tableName)
         throws IOException, InterruptedException {
         UserGroupInformation ugi;
         if (provider != null) {
@@ -74,10 +76,10 @@ public class Utils {
             }
         }
 
-        return ugi.doAs(new PrivilegedExceptionAction<HTable>() {
+        return ugi.doAs(new PrivilegedExceptionAction<Table>() {
             @Override
-            public HTable run() throws IOException {
-                return new HTable(config, tableName);
+            public Table run() throws IOException {
+                return ConnectionFactory.createConnection(config).getTable(TableName.valueOf(tableName));
             }
         });
     }
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
index d2a73d4..63169fb 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
@@ -28,10 +28,10 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.storm.hbase.common.Utils;
 import org.apache.storm.hbase.security.HBaseSecurityUtil;
@@ -72,7 +72,7 @@ public class HBaseMapState<T> implements IBackingMap<T> {
     private int partitionNum;
     private Options<T> options;
     private Serializer<T> serializer;
-    private HTable table;
+    private Table table;
 
     /**
      * Constructor.
@@ -183,7 +183,7 @@ public class HBaseMapState<T> implements IBackingMap<T> {
                      new Object[]{ this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i))) });
             Put put = new Put(hbaseKey);
             T val = values.get(i);
-            put.add(this.options.columnFamily.getBytes(),
+            put.addColumn(this.options.columnFamily.getBytes(),
                     qualifier.getBytes(),
                     this.serializer.serialize(val));
 
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index 086b477..3fe4bc4 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -23,12 +23,15 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.storm.trident.windowing.WindowKryoSerializer;
 import org.apache.storm.trident.windowing.WindowsStore;
@@ -41,9 +44,9 @@ import org.slf4j.LoggerFactory;
 public class HBaseWindowsStore implements WindowsStore {
     public static final String UTF_8 = "utf-8";
     private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class);
-    private final ThreadLocal<HTable> threadLocalHtable;
+    private final ThreadLocal<Table> threadLocalHtable;
     private final ThreadLocal<WindowKryoSerializer> threadLocalWindowKryoSerializer;
-    private final Queue<HTable> htables = new ConcurrentLinkedQueue<>();
+    private final Queue<Table> htables = new ConcurrentLinkedQueue<>();
     private final byte[] family;
     private final byte[] qualifier;
 
@@ -52,13 +55,13 @@ public class HBaseWindowsStore implements WindowsStore {
         this.family = family;
         this.qualifier = qualifier;
 
-        threadLocalHtable = new ThreadLocal<HTable>() {
+        threadLocalHtable = new ThreadLocal<Table>() {
             @Override
-            protected HTable initialValue() {
+            protected Table initialValue() {
                 try {
-                    HTable hTable = new HTable(config, tableName);
-                    htables.add(hTable);
-                    return hTable;
+                    Table table = ConnectionFactory.createConnection(config).getTable(TableName.valueOf(tableName));
+                    htables.add(table);
+                    return table;
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
@@ -74,7 +77,7 @@ public class HBaseWindowsStore implements WindowsStore {
 
     }
 
-    private HTable htable() {
+    private Table htable() {
         return threadLocalHtable.get();
     }
 
@@ -252,7 +255,7 @@ public class HBaseWindowsStore implements WindowsStore {
     @Override
     public void shutdown() {
         // close all the created hTable instances
-        for (HTable htable : htables) {
+        for (Table htable : htables) {
             try {
                 htable.close();
             } catch (IOException e) {
diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml
index 7723916..2fd8ae5 100644
--- a/external/storm-hive/pom.xml
+++ b/external/storm-hive/pom.xml
@@ -154,6 +154,7 @@
         <artifactId>java-hamcrest</artifactId>
     </dependency>
     <dependency>
+        <!-- Needed in this version by hive -->
       <groupId>org.apache.thrift</groupId>
       <artifactId>libthrift</artifactId>
       <version>0.9.3</version>
diff --git a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
index 6e732ae..484d3c1 100644
--- a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
+++ b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
@@ -23,9 +23,11 @@ import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -61,7 +63,7 @@ public class WordCountClient {
             System.exit(1);
         }
 
-        HTable table = new HTable(config, "WordCount");
+        Table table = ConnectionFactory.createConnection(config).getTable(TableName.valueOf("WordCount"));
         String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
 
         for (String word : words) {
diff --git a/pom.xml b/pom.xml
index b8a8740..f359fc5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -287,10 +287,10 @@
         <mockito.version>2.19.0</mockito.version>
         <zookeeper.version>3.4.6</zookeeper.version>
         <jline.version>0.9.94</jline.version>
-        <hive.version>2.3.3</hive.version>
-        <hadoop.version>2.6.1</hadoop.version>
+        <hive.version>2.3.4</hive.version>
+        <hadoop.version>2.8.5</hadoop.version>
         <hdfs.version>${hadoop.version}</hdfs.version>
-        <hbase.version>1.4.4</hbase.version>
+        <hbase.version>2.1.3</hbase.version>
         <kryo.version>3.0.3</kryo.version>
         <servlet.version>3.1.0</servlet.version>
         <joda-time.version>2.3</joda-time.version>
@@ -301,7 +301,6 @@
         <hdrhistogram.version>2.1.10</hdrhistogram.version>
         <hamcrest.version>2.0.0.0</hamcrest.version>
         <cassandra.version>2.1.7</cassandra.version>
-        <druid.version>0.8.2</druid.version>
         <elasticsearch.version>5.2.2</elasticsearch.version>
         <calcite.version>1.14.0</calcite.version>
         <mongodb.version>3.2.0</mongodb.version>
@@ -1084,6 +1083,62 @@
                    </exclusion>
                 </exclusions>
            </dependency>
+           <!-- Hadoop dependencies. Specified here so HDFS/HBase don't import older versions of these jars in their projects-->
+           <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-auth</artifactId>
+               <version>${hadoop.version}</version>
+           </dependency>
+           <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-hdfs</artifactId>
+               <version>${hadoop.version}</version>
+           </dependency>
+           <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-common</artifactId>
+               <version>${hadoop.version}</version>
+           </dependency>
+           <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-annotations</artifactId>
+               <version>${hadoop.version}</version>
+           </dependency>
+           <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-mapreduce-client-core</artifactId>
+               <version>${hadoop.version}</version>
+           </dependency>
+           <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-yarn-common</artifactId>
+               <version>${hadoop.version}</version>
+           </dependency>
+           <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+               <version>${hadoop.version}</version>
+           </dependency>
+           <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+               <version>${hadoop.version}</version>
+           </dependency>
+           <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+               <version>${hadoop.version}</version>
+           </dependency>
+           <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-yarn-registry</artifactId>
+               <version>${hadoop.version}</version>
+           </dependency>
+           <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-archives</artifactId>
+               <version>${hadoop.version}</version>
+           </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
index aa7714d..896f0c2 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
@@ -20,6 +20,7 @@ import org.apache.storm.policy.IWaitStrategy;
 import org.apache.storm.policy.WaitStrategyPark;
 import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
 public class JCQueueTest {
@@ -31,34 +32,32 @@ public class JCQueueTest {
     @Test
     public void testFirstMessageFirst() throws InterruptedException {
         Assertions.assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
-            for (int i = 0; i < 100; i++) {
-                JCQueue queue = createQueue("firstMessageOrder", 16);
+            JCQueue queue = createQueue("firstMessageOrder", 16);
 
-                queue.publish("FIRST");
+            queue.publish("FIRST");
 
-                Runnable producer = new IncProducer(queue, i + 100, 1);
+            Runnable producer = new IncProducer(queue, 100, 1);
 
-                final AtomicReference<Object> result = new AtomicReference<>();
-                Runnable consumer = new ConsumerThd(queue, new JCQueue.Consumer() {
-                    private boolean head = true;
+            final AtomicReference<Object> result = new AtomicReference<>();
+            Runnable consumer = new ConsumerThd(queue, new JCQueue.Consumer() {
+                private boolean head = true;
 
-                    @Override
-                    public void accept(Object event) {
-                        if (head) {
-                            head = false;
-                            result.set(event);
-                        }
+                @Override
+                public void accept(Object event) {
+                    if (head) {
+                        head = false;
+                        result.set(event);
                     }
+                }
 
-                    @Override
-                    public void flush() {
-                    }
-                });
+                @Override
+                public void flush() {
+                }
+            });
 
-                run(producer, consumer, queue);
-                Assert.assertEquals("We expect to receive first published message first, but received " + result.get(),
+            run(producer, consumer, queue);
+            Assert.assertEquals("We expect to receive first published message first, but received " + result.get(),
                     "FIRST", result.get());
-            }
         });
     }