You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/29 10:40:33 UTC
[pulsar] 03/08: Remove the deprecated api usage in hdfs (#12080)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e0d3d4218a158ba8d76d582e014d7291a824b05c
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Sun Sep 26 09:23:38 2021 +0800
Remove the deprecated api usage in hdfs (#12080)
### Motivation
Remove the deprecated api usage in hdfs
### Modifications
Use try with resources instead of closeQuitely
Remove the Long constructor
* Remove the deprecated api usage in hdfs
* Update pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
Co-authored-by: Yunze Xu <xy...@163.com>
Co-authored-by: Yunze Xu <xy...@163.com>
(cherry picked from commit f8220166a1b9af09a35d926adde90955be225af1)
---
.../java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java | 7 +------
.../apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java | 2 +-
.../java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java | 7 +------
.../apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java | 2 +-
4 files changed, 4 insertions(+), 14 deletions(-)
diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
index f1aab8b..456d0e5 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -138,12 +137,8 @@ public abstract class AbstractHdfsConnector {
}
InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
- Socket socket = null;
- try {
- socket = socketFactory.createSocket();
+ try (Socket socket = socketFactory.createSocket()) {
NetUtils.connect(socket, namenode, 1000); // 1 second timeout
- } finally {
- IOUtils.closeQuietly(socket);
}
}
diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
index 88aee08..a3687b8 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
@@ -60,7 +60,7 @@ public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, S
@Override
public KeyValue<Long, String> extractKeyValue(Record<String> record) {
- Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet()));
+ Long sequence = record.getRecordSequence().orElseGet(() -> counter.incrementAndGet());
return new KeyValue<>(sequence, record.getValue());
}
diff --git a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
index 28e0bd4..dfe8833 100644
--- a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -138,12 +137,8 @@ public abstract class AbstractHdfsConnector {
}
InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
- Socket socket = null;
- try {
- socket = socketFactory.createSocket();
+ try (Socket socket = socketFactory.createSocket()){
NetUtils.connect(socket, namenode, 1000); // 1 second timeout
- } finally {
- IOUtils.closeQuietly(socket);
}
}
diff --git a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
index cca5398..d63f360 100644
--- a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
@@ -60,7 +60,7 @@ public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, S
@Override
public KeyValue<Long, String> extractKeyValue(Record<String> record) {
- Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet()));
+ Long sequence = record.getRecordSequence().orElseGet(() -> counter.incrementAndGet());
return new KeyValue<>(sequence, record.getValue());
}