You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/29 10:40:33 UTC

[pulsar] 03/08: Remove the deprecated api usage in hdfs (#12080)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e0d3d4218a158ba8d76d582e014d7291a824b05c
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Sun Sep 26 09:23:38 2021 +0800

    Remove the deprecated api usage in hdfs (#12080)
    
    ### Motivation
    Remove the deprecated api usage in hdfs
    
    ### Modifications
    Use try with resources instead of closeQuitely
    Remove the Long constructor
    
    * Remove the deprecated api usage in hdfs
    
    * Update pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
    
    Co-authored-by: Yunze Xu <xy...@163.com>
    
    Co-authored-by: Yunze Xu <xy...@163.com>
    (cherry picked from commit f8220166a1b9af09a35d926adde90955be225af1)
---
 .../java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java     | 7 +------
 .../apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java    | 2 +-
 .../java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java     | 7 +------
 .../apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java    | 2 +-
 4 files changed, 4 insertions(+), 14 deletions(-)

diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
index f1aab8b..456d0e5 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -138,12 +137,8 @@ public abstract class AbstractHdfsConnector {
         }
         InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
         SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
-        Socket socket = null;
-        try {
-            socket = socketFactory.createSocket();
+        try (Socket socket = socketFactory.createSocket()) {
             NetUtils.connect(socket, namenode, 1000); // 1 second timeout
-        } finally {
-            IOUtils.closeQuietly(socket);
         }
     }
 
diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
index 88aee08..a3687b8 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
@@ -60,7 +60,7 @@ public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, S
 
     @Override
     public KeyValue<Long, String> extractKeyValue(Record<String> record) {
-       Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet()));
+       Long sequence = record.getRecordSequence().orElseGet(() -> counter.incrementAndGet());
        return new KeyValue<>(sequence, record.getValue());
     }
 
diff --git a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
index 28e0bd4..dfe8833 100644
--- a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -138,12 +137,8 @@ public abstract class AbstractHdfsConnector {
         }
         InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
         SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
-        Socket socket = null;
-        try {
-            socket = socketFactory.createSocket();
+        try (Socket socket = socketFactory.createSocket()){
             NetUtils.connect(socket, namenode, 1000); // 1 second timeout
-        } finally {
-            IOUtils.closeQuietly(socket);
         }
     }
 
diff --git a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
index cca5398..d63f360 100644
--- a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
@@ -60,7 +60,7 @@ public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, S
 
     @Override
     public KeyValue<Long, String> extractKeyValue(Record<String> record) {
-       Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet()));
+       Long sequence = record.getRecordSequence().orElseGet(() -> counter.incrementAndGet());
        return new KeyValue<>(sequence, record.getValue());
     }