You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2022/07/28 01:53:18 UTC
[hadoop] branch trunk updated: HDFS-16660. Improve Code With Lambda in IPCLoggerChannel class (#4561)
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 24560f2eb52 HDFS-16660. Improve Code With Lambda in IPCLoggerChannel class (#4561)
24560f2eb52 is described below
commit 24560f2eb52f4bc4fa47efc4eab6989b08808ae7
Author: xuzq <15...@163.com>
AuthorDate: Thu Jul 28 09:53:05 2022 +0800
HDFS-16660. Improve Code With Lambda in IPCLoggerChannel class (#4561)
---
.../hdfs/qjournal/client/IPCLoggerChannel.java | 368 +++++++--------------
1 file changed, 128 insertions(+), 240 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
index 58c5ad39b99..4b7e59c51f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
@@ -23,7 +23,6 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -67,6 +66,7 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningE
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.UncaughtExceptionHandlers;
+import org.apache.hadoop.util.Time;
/**
* Channel to a remote JournalNode using Hadoop IPC.
@@ -154,26 +154,15 @@ public class IPCLoggerChannel implements AsyncLogger {
private static final long WARN_JOURNAL_MILLIS_THRESHOLD = 1000;
- static final Factory FACTORY = new AsyncLogger.Factory() {
- @Override
- public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
- String journalId, String nameServiceId, InetSocketAddress addr) {
- return new IPCLoggerChannel(conf, nsInfo, journalId, nameServiceId, addr);
- }
- };
+ static final Factory FACTORY = IPCLoggerChannel::new;
- public IPCLoggerChannel(Configuration conf,
- NamespaceInfo nsInfo,
- String journalId,
- InetSocketAddress addr) {
+ public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo,
+ String journalId, InetSocketAddress addr) {
this(conf, nsInfo, journalId, null, addr);
}
- public IPCLoggerChannel(Configuration conf,
- NamespaceInfo nsInfo,
- String journalId,
- String nameServiceId,
- InetSocketAddress addr) {
+ public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo,
+ String journalId, String nameServiceId, InetSocketAddress addr) {
this.conf = conf;
this.nsInfo = nsInfo;
this.journalId = journalId;
@@ -202,7 +191,7 @@ public class IPCLoggerChannel implements AsyncLogger {
"Trying to move committed txid backwards in client " +
"old: %s new: %s", committedTxId, txid);
this.committedTxId = txid;
- this.lastCommitNanos = System.nanoTime();
+ this.lastCommitNanos = Time.monotonicNowNanos();
}
@Override
@@ -229,25 +218,19 @@ public class IPCLoggerChannel implements AsyncLogger {
final Configuration confCopy = new Configuration(conf);
// Need to set NODELAY or else batches larger than MTU can trigger
- // 40ms nagling delays.
- confCopy.setBoolean(
- CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
- true);
-
+ // 40ms nailing delays.
+ confCopy.setBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, true);
RPC.setProtocolEngine(confCopy,
QJournalProtocolPB.class, ProtobufRpcEngine2.class);
return SecurityUtil.doAsLoginUser(
- new PrivilegedExceptionAction<QJournalProtocol>() {
- @Override
- public QJournalProtocol run() throws IOException {
- RPC.setProtocolEngine(confCopy,
- QJournalProtocolPB.class, ProtobufRpcEngine2.class);
- QJournalProtocolPB pbproxy = RPC.getProxy(
- QJournalProtocolPB.class,
- RPC.getProtocolVersion(QJournalProtocolPB.class),
- addr, confCopy);
- return new QJournalProtocolTranslatorPB(pbproxy);
- }
+ (PrivilegedExceptionAction<QJournalProtocol>) () -> {
+ RPC.setProtocolEngine(confCopy,
+ QJournalProtocolPB.class, ProtobufRpcEngine2.class);
+ QJournalProtocolPB pbproxy = RPC.getProxy(
+ QJournalProtocolPB.class,
+ RPC.getProtocolVersion(QJournalProtocolPB.class),
+ addr, confCopy);
+ return new QJournalProtocolTranslatorPB(pbproxy);
});
}
@@ -260,10 +243,8 @@ public class IPCLoggerChannel implements AsyncLogger {
return Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
- .setNameFormat("Logger channel (from single-thread executor) to " +
- addr)
- .setUncaughtExceptionHandler(
- UncaughtExceptionHandlers.systemExit())
+ .setNameFormat("Logger channel (from single-thread executor) to " + addr)
+ .setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit())
.build());
}
@@ -308,11 +289,6 @@ public class IPCLoggerChannel implements AsyncLogger {
epoch, ipcSerial++, committedTxId);
}
- @VisibleForTesting
- synchronized long getNextIpcSerial() {
- return ipcSerial;
- }
-
public synchronized int getQueuedEditsSize() {
return queuedEditsSizeBytes;
}
@@ -333,11 +309,7 @@ public class IPCLoggerChannel implements AsyncLogger {
@VisibleForTesting
void waitForAllPendingCalls() throws InterruptedException {
try {
- singleThreadExecutor.submit(new Runnable() {
- @Override
- public void run() {
- }
- }).get();
+ singleThreadExecutor.submit(() -> {}).get();
} catch (ExecutionException e) {
// This can't happen!
throw new AssertionError(e);
@@ -346,36 +318,23 @@ public class IPCLoggerChannel implements AsyncLogger {
@Override
public ListenableFuture<Boolean> isFormatted() {
- return singleThreadExecutor.submit(new Callable<Boolean>() {
- @Override
- public Boolean call() throws IOException {
- return getProxy().isFormatted(journalId, nameServiceId);
- }
- });
+ return singleThreadExecutor.submit(() -> getProxy().isFormatted(journalId, nameServiceId));
}
@Override
public ListenableFuture<GetJournalStateResponseProto> getJournalState() {
- return singleThreadExecutor.submit(new Callable<GetJournalStateResponseProto>() {
- @Override
- public GetJournalStateResponseProto call() throws IOException {
- GetJournalStateResponseProto ret =
- getProxy().getJournalState(journalId, nameServiceId);
- constructHttpServerURI(ret);
- return ret;
- }
+ return singleThreadExecutor.submit(() -> {
+ GetJournalStateResponseProto ret = getProxy().getJournalState(journalId, nameServiceId);
+ constructHttpServerURI(ret);
+ return ret;
});
}
@Override
public ListenableFuture<NewEpochResponseProto> newEpoch(
final long epoch) {
- return singleThreadExecutor.submit(new Callable<NewEpochResponseProto>() {
- @Override
- public NewEpochResponseProto call() throws IOException {
- return getProxy().newEpoch(journalId, nameServiceId, nsInfo, epoch);
- }
- });
+ return singleThreadExecutor.submit(
+ () -> getProxy().newEpoch(journalId, nameServiceId, nsInfo, epoch));
}
@Override
@@ -390,50 +349,43 @@ public class IPCLoggerChannel implements AsyncLogger {
// When this batch is acked, we use its submission time in order
// to calculate how far we are lagging.
- final long submitNanos = System.nanoTime();
+ final long submitNanos = Time.monotonicNowNanos();
ListenableFuture<Void> ret = null;
try {
- ret = singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- throwIfOutOfSync();
-
- long rpcSendTimeNanos = System.nanoTime();
- try {
- getProxy().journal(createReqInfo(),
- segmentTxId, firstTxnId, numTxns, data);
- } catch (IOException e) {
- QuorumJournalManager.LOG.warn(
- "Remote journal " + IPCLoggerChannel.this + " failed to " +
- "write txns " + firstTxnId + "-" + (firstTxnId + numTxns - 1) +
- ". Will try to write to this JN again after the next " +
- "log roll.", e);
- synchronized (IPCLoggerChannel.this) {
- outOfSync = true;
- }
- throw e;
- } finally {
- long now = System.nanoTime();
- long rpcTime = TimeUnit.MICROSECONDS.convert(
- now - rpcSendTimeNanos, TimeUnit.NANOSECONDS);
- long endToEndTime = TimeUnit.MICROSECONDS.convert(
- now - submitNanos, TimeUnit.NANOSECONDS);
- metrics.addWriteEndToEndLatency(endToEndTime);
- metrics.addWriteRpcLatency(rpcTime);
- if (rpcTime / 1000 > WARN_JOURNAL_MILLIS_THRESHOLD) {
- QuorumJournalManager.LOG.warn(
- "Took " + (rpcTime / 1000) + "ms to send a batch of " +
- numTxns + " edits (" + data.length + " bytes) to " +
- "remote journal " + IPCLoggerChannel.this);
- }
- }
+ ret = singleThreadExecutor.submit(() -> {
+ throwIfOutOfSync();
+
+ final long rpcSendTimeNanos = Time.monotonicNowNanos();
+ try {
+ getProxy().journal(createReqInfo(), segmentTxId, firstTxnId, numTxns, data);
+ } catch (IOException e) {
+ QuorumJournalManager.LOG.warn("Remote journal {} failed to write txns {}-{}."
+ + " Will try to write to this JN again after the next log roll.",
+ IPCLoggerChannel.this, firstTxnId, (firstTxnId + numTxns - 1), e);
synchronized (IPCLoggerChannel.this) {
- highestAckedTxId = firstTxnId + numTxns - 1;
- lastAckNanos = submitNanos;
+ outOfSync = true;
}
- return null;
+ throw e;
+ } finally {
+ final long nowNanos = Time.monotonicNowNanos();
+ final long rpcTimeMicros = TimeUnit.MICROSECONDS.convert(
+ (nowNanos - rpcSendTimeNanos), TimeUnit.NANOSECONDS);
+ final long endToEndTimeMicros = TimeUnit.MICROSECONDS.convert(
+ (nowNanos - submitNanos), TimeUnit.NANOSECONDS);
+ metrics.addWriteEndToEndLatency(endToEndTimeMicros);
+ metrics.addWriteRpcLatency(rpcTimeMicros);
+ if (rpcTimeMicros / 1000 > WARN_JOURNAL_MILLIS_THRESHOLD) {
+ QuorumJournalManager.LOG.warn(
+ "Took {}ms to send a batch of {} edits ({} bytes) to remote journal {}.",
+ rpcTimeMicros / 1000, numTxns, data.length, IPCLoggerChannel.this);
+ }
+ }
+ synchronized (IPCLoggerChannel.this) {
+ highestAckedTxId = firstTxnId + numTxns - 1;
+ lastAckNanos = submitNanos;
}
+ return null;
});
} finally {
if (ret == null) {
@@ -460,14 +412,12 @@ public class IPCLoggerChannel implements AsyncLogger {
return ret;
}
- private void throwIfOutOfSync()
- throws JournalOutOfSyncException, IOException {
+ private void throwIfOutOfSync() throws IOException {
if (isOutOfSync()) {
// Even if we're out of sync, it's useful to send an RPC
// to the remote node in order to update its lag metrics, etc.
heartbeatIfNecessary();
- throw new JournalOutOfSyncException(
- "Journal disabled until next roll");
+ throw new JournalOutOfSyncException("Journal disabled until next roll");
}
}
@@ -497,12 +447,10 @@ public class IPCLoggerChannel implements AsyncLogger {
private synchronized void reserveQueueSpace(int size)
throws LoggerTooFarBehindException {
Preconditions.checkArgument(size >= 0);
- if (queuedEditsSizeBytes + size > queueSizeLimitBytes &&
- queuedEditsSizeBytes > 0) {
- QuorumJournalManager.LOG.warn("Pending edits to " + IPCLoggerChannel.this
- + " is going to exceed limit size: " + queueSizeLimitBytes
- + ", current queued edits size: " + queuedEditsSizeBytes
- + ", will silently drop " + size + " bytes of edits!");
+ if (queuedEditsSizeBytes + size > queueSizeLimitBytes && queuedEditsSizeBytes > 0) {
+ QuorumJournalManager.LOG.warn("Pending edits to {} is going to exceed limit size: {}"
+ + ", current queued edits size: {}, will silently drop {} bytes of edits!",
+ IPCLoggerChannel.class, queueSizeLimitBytes, queuedEditsSizeBytes, size);
throw new LoggerTooFarBehindException();
}
queuedEditsSizeBytes += size;
@@ -514,203 +462,144 @@ public class IPCLoggerChannel implements AsyncLogger {
}
@Override
- public ListenableFuture<Void> format(final NamespaceInfo nsInfo,
- final boolean force) {
- return singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- getProxy().format(journalId, nameServiceId, nsInfo, force);
- return null;
- }
+ public ListenableFuture<Void> format(final NamespaceInfo nsInfo, final boolean force) {
+ return singleThreadExecutor.submit(() -> {
+ getProxy().format(journalId, nameServiceId, nsInfo, force);
+ return null;
});
}
@Override
- public ListenableFuture<Void> startLogSegment(final long txid,
- final int layoutVersion) {
- return singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- getProxy().startLogSegment(createReqInfo(), txid, layoutVersion);
- synchronized (IPCLoggerChannel.this) {
- if (outOfSync) {
- outOfSync = false;
- QuorumJournalManager.LOG.info(
- "Restarting previously-stopped writes to " +
- IPCLoggerChannel.this + " in segment starting at txid " +
- txid);
- }
+ public ListenableFuture<Void> startLogSegment(final long txid, final int layoutVersion) {
+ return singleThreadExecutor.submit(() -> {
+ getProxy().startLogSegment(createReqInfo(), txid, layoutVersion);
+ synchronized (IPCLoggerChannel.this) {
+ if (outOfSync) {
+ outOfSync = false;
+ QuorumJournalManager.LOG.info(
+ "Restarting previously-stopped writes to {} in segment starting at txid {}.",
+ IPCLoggerChannel.class, txid);
}
- return null;
}
+ return null;
});
}
@Override
- public ListenableFuture<Void> finalizeLogSegment(
- final long startTxId, final long endTxId) {
- return singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- throwIfOutOfSync();
-
- getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId);
- return null;
- }
+ public ListenableFuture<Void> finalizeLogSegment(final long startTxId, final long endTxId) {
+ return singleThreadExecutor.submit(() -> {
+ throwIfOutOfSync();
+ getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId);
+ return null;
});
}
@Override
public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) {
- return singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
- return null;
- }
+ return singleThreadExecutor.submit(() -> {
+ getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
+ return null;
});
}
@Override
public ListenableFuture<GetJournaledEditsResponseProto> getJournaledEdits(
long fromTxnId, int maxTransactions) {
- return parallelExecutor.submit(
- new Callable<GetJournaledEditsResponseProto>() {
- @Override
- public GetJournaledEditsResponseProto call() throws IOException {
- return getProxy().getJournaledEdits(journalId, nameServiceId,
- fromTxnId, maxTransactions);
- }
- });
+ return parallelExecutor.submit(() -> getProxy().getJournaledEdits(
+ journalId, nameServiceId, fromTxnId, maxTransactions));
}
@Override
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
final long fromTxnId, final boolean inProgressOk) {
- return parallelExecutor.submit(new Callable<RemoteEditLogManifest>() {
- @Override
- public RemoteEditLogManifest call() throws IOException {
- GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
- journalId, nameServiceId, fromTxnId, inProgressOk);
- // Update the http port, since we need this to build URLs to any of the
- // returned logs.
- constructHttpServerURI(ret);
- return PBHelper.convert(ret.getManifest());
- }
+ return parallelExecutor.submit(() -> {
+ GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
+ journalId, nameServiceId, fromTxnId, inProgressOk);
+ // Update the http port, since we need this to build URLs to any of the
+ // returned logs.
+ constructHttpServerURI(ret);
+ return PBHelper.convert(ret.getManifest());
});
}
@Override
- public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(
- final long segmentTxId) {
- return singleThreadExecutor.submit(new Callable<PrepareRecoveryResponseProto>() {
- @Override
- public PrepareRecoveryResponseProto call() throws IOException {
- if (!hasHttpServerEndPoint()) {
- // force an RPC call so we know what the HTTP port should be if it
- // haven't done so.
- GetJournalStateResponseProto ret = getProxy().getJournalState(
- journalId, nameServiceId);
- constructHttpServerURI(ret);
- }
- return getProxy().prepareRecovery(createReqInfo(), segmentTxId);
+ public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(final long segmentTxId) {
+ return singleThreadExecutor.submit(() -> {
+ if (!hasHttpServerEndPoint()) {
+ // force an RPC call, so we know what the HTTP port should be if it
+ // hasn't done so.
+ GetJournalStateResponseProto ret = getProxy().getJournalState(
+ journalId, nameServiceId);
+ constructHttpServerURI(ret);
}
+ return getProxy().prepareRecovery(createReqInfo(), segmentTxId);
});
}
@Override
- public ListenableFuture<Void> acceptRecovery(
- final SegmentStateProto log, final URL url) {
- return singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- getProxy().acceptRecovery(createReqInfo(), log, url);
- return null;
- }
+ public ListenableFuture<Void> acceptRecovery(final SegmentStateProto log, final URL url) {
+ return singleThreadExecutor.submit(() -> {
+ getProxy().acceptRecovery(createReqInfo(), log, url);
+ return null;
});
}
@Override
public ListenableFuture<Void> doPreUpgrade() {
- return singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- getProxy().doPreUpgrade(journalId);
- return null;
- }
+ return singleThreadExecutor.submit(() -> {
+ getProxy().doPreUpgrade(journalId);
+ return null;
});
}
@Override
public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) {
- return singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- getProxy().doUpgrade(journalId, sInfo);
- return null;
- }
+ return singleThreadExecutor.submit(() -> {
+ getProxy().doUpgrade(journalId, sInfo);
+ return null;
});
}
@Override
public ListenableFuture<Void> doFinalize() {
- return singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- getProxy().doFinalize(journalId, nameServiceId);
- return null;
- }
+ return singleThreadExecutor.submit(() -> {
+ getProxy().doFinalize(journalId, nameServiceId);
+ return null;
});
}
@Override
public ListenableFuture<Boolean> canRollBack(final StorageInfo storage,
final StorageInfo prevStorage, final int targetLayoutVersion) {
- return singleThreadExecutor.submit(new Callable<Boolean>() {
- @Override
- public Boolean call() throws IOException {
- return getProxy().canRollBack(journalId, nameServiceId,
- storage, prevStorage, targetLayoutVersion);
- }
- });
+ return singleThreadExecutor.submit(
+ () -> getProxy().canRollBack(journalId, nameServiceId,
+ storage, prevStorage, targetLayoutVersion));
}
@Override
public ListenableFuture<Void> doRollback() {
- return singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- getProxy().doRollback(journalId, nameServiceId);
- return null;
- }
+ return singleThreadExecutor.submit(() -> {
+ getProxy().doRollback(journalId, nameServiceId);
+ return null;
});
}
@Override
public ListenableFuture<Void> discardSegments(final long startTxId) {
- return singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- getProxy().discardSegments(journalId, nameServiceId, startTxId);
- return null;
- }
+ return singleThreadExecutor.submit(() -> {
+ getProxy().discardSegments(journalId, nameServiceId, startTxId);
+ return null;
});
}
@Override
public ListenableFuture<Long> getJournalCTime() {
- return singleThreadExecutor.submit(new Callable<Long>() {
- @Override
- public Long call() throws IOException {
- return getProxy().getJournalCTime(journalId, nameServiceId);
- }
- });
+ return singleThreadExecutor.submit(() -> getProxy().getJournalCTime(journalId, nameServiceId));
}
@Override
public String toString() {
- return InetAddresses.toAddrString(addr.getAddress()) + ':' +
- addr.getPort();
+ return InetAddresses.toAddrString(addr.getAddress()) + ':' + addr.getPort();
}
@Override
@@ -778,5 +667,4 @@ public class IPCLoggerChannel implements AsyncLogger {
private boolean hasHttpServerEndPoint() {
return httpServerURL != null;
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org