You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2022/07/28 01:53:18 UTC

[hadoop] branch trunk updated: HDFS-16660. Improve Code With Lambda in IPCLoggerChannel class (#4561)

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 24560f2eb52 HDFS-16660. Improve Code With Lambda in IPCLoggerChannel class (#4561)
24560f2eb52 is described below

commit 24560f2eb52f4bc4fa47efc4eab6989b08808ae7
Author: xuzq <15...@163.com>
AuthorDate: Thu Jul 28 09:53:05 2022 +0800

    HDFS-16660. Improve Code With Lambda in IPCLoggerChannel class (#4561)
---
 .../hdfs/qjournal/client/IPCLoggerChannel.java     | 368 +++++++--------------
 1 file changed, 128 insertions(+), 240 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
index 58c5ad39b99..4b7e59c51f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
@@ -23,7 +23,6 @@ import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
 import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -67,6 +66,7 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningE
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.UncaughtExceptionHandlers;
+import org.apache.hadoop.util.Time;
 
 /**
  * Channel to a remote JournalNode using Hadoop IPC.
@@ -154,26 +154,15 @@ public class IPCLoggerChannel implements AsyncLogger {
 
   private static final long WARN_JOURNAL_MILLIS_THRESHOLD = 1000;
   
-  static final Factory FACTORY = new AsyncLogger.Factory() {
-    @Override
-    public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
-        String journalId, String nameServiceId, InetSocketAddress addr) {
-      return new IPCLoggerChannel(conf, nsInfo, journalId, nameServiceId, addr);
-    }
-  };
+  static final Factory FACTORY = IPCLoggerChannel::new;
 
-  public IPCLoggerChannel(Configuration conf,
-      NamespaceInfo nsInfo,
-      String journalId,
-      InetSocketAddress addr) {
+  public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo,
+      String journalId, InetSocketAddress addr) {
     this(conf, nsInfo, journalId, null, addr);
   }
 
-  public IPCLoggerChannel(Configuration conf,
-                          NamespaceInfo nsInfo,
-                          String journalId,
-                          String nameServiceId,
-                          InetSocketAddress addr) {
+  public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo,
+      String journalId, String nameServiceId, InetSocketAddress addr) {
     this.conf = conf;
     this.nsInfo = nsInfo;
     this.journalId = journalId;
@@ -202,7 +191,7 @@ public class IPCLoggerChannel implements AsyncLogger {
         "Trying to move committed txid backwards in client " +
          "old: %s new: %s", committedTxId, txid);
     this.committedTxId = txid;
-    this.lastCommitNanos = System.nanoTime();
+    this.lastCommitNanos = Time.monotonicNowNanos();
   }
   
   @Override
@@ -229,25 +218,19 @@ public class IPCLoggerChannel implements AsyncLogger {
     final Configuration confCopy = new Configuration(conf);
     
     // Need to set NODELAY or else batches larger than MTU can trigger 
-    // 40ms nagling delays.
-    confCopy.setBoolean(
-        CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
-        true);
-    
+    // 40ms nailing delays.
+    confCopy.setBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, true);
     RPC.setProtocolEngine(confCopy,
         QJournalProtocolPB.class, ProtobufRpcEngine2.class);
     return SecurityUtil.doAsLoginUser(
-        new PrivilegedExceptionAction<QJournalProtocol>() {
-          @Override
-          public QJournalProtocol run() throws IOException {
-            RPC.setProtocolEngine(confCopy,
-                QJournalProtocolPB.class, ProtobufRpcEngine2.class);
-            QJournalProtocolPB pbproxy = RPC.getProxy(
-                QJournalProtocolPB.class,
-                RPC.getProtocolVersion(QJournalProtocolPB.class),
-                addr, confCopy);
-            return new QJournalProtocolTranslatorPB(pbproxy);
-          }
+        (PrivilegedExceptionAction<QJournalProtocol>) () -> {
+          RPC.setProtocolEngine(confCopy,
+              QJournalProtocolPB.class, ProtobufRpcEngine2.class);
+          QJournalProtocolPB pbproxy = RPC.getProxy(
+              QJournalProtocolPB.class,
+              RPC.getProtocolVersion(QJournalProtocolPB.class),
+              addr, confCopy);
+          return new QJournalProtocolTranslatorPB(pbproxy);
         });
   }
   
@@ -260,10 +243,8 @@ public class IPCLoggerChannel implements AsyncLogger {
     return Executors.newSingleThreadExecutor(
         new ThreadFactoryBuilder()
           .setDaemon(true)
-          .setNameFormat("Logger channel (from single-thread executor) to " +
-              addr)
-          .setUncaughtExceptionHandler(
-              UncaughtExceptionHandlers.systemExit())
+          .setNameFormat("Logger channel (from single-thread executor) to " + addr)
+          .setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit())
           .build());
   }
 
@@ -308,11 +289,6 @@ public class IPCLoggerChannel implements AsyncLogger {
         epoch, ipcSerial++, committedTxId);
   }
 
-  @VisibleForTesting
-  synchronized long getNextIpcSerial() {
-    return ipcSerial;
-  }
-
   public synchronized int getQueuedEditsSize() {
     return queuedEditsSizeBytes;
   }
@@ -333,11 +309,7 @@ public class IPCLoggerChannel implements AsyncLogger {
   @VisibleForTesting
   void waitForAllPendingCalls() throws InterruptedException {
     try {
-      singleThreadExecutor.submit(new Runnable() {
-        @Override
-        public void run() {
-        }
-      }).get();
+      singleThreadExecutor.submit(() -> {}).get();
     } catch (ExecutionException e) {
       // This can't happen!
       throw new AssertionError(e);
@@ -346,36 +318,23 @@ public class IPCLoggerChannel implements AsyncLogger {
 
   @Override
   public ListenableFuture<Boolean> isFormatted() {
-    return singleThreadExecutor.submit(new Callable<Boolean>() {
-      @Override
-      public Boolean call() throws IOException {
-        return getProxy().isFormatted(journalId, nameServiceId);
-      }
-    });
+    return singleThreadExecutor.submit(() -> getProxy().isFormatted(journalId, nameServiceId));
   }
 
   @Override
   public ListenableFuture<GetJournalStateResponseProto> getJournalState() {
-    return singleThreadExecutor.submit(new Callable<GetJournalStateResponseProto>() {
-      @Override
-      public GetJournalStateResponseProto call() throws IOException {
-        GetJournalStateResponseProto ret =
-            getProxy().getJournalState(journalId, nameServiceId);
-        constructHttpServerURI(ret);
-        return ret;
-      }
+    return singleThreadExecutor.submit(() -> {
+      GetJournalStateResponseProto ret = getProxy().getJournalState(journalId, nameServiceId);
+      constructHttpServerURI(ret);
+      return ret;
     });
   }
 
   @Override
   public ListenableFuture<NewEpochResponseProto> newEpoch(
       final long epoch) {
-    return singleThreadExecutor.submit(new Callable<NewEpochResponseProto>() {
-      @Override
-      public NewEpochResponseProto call() throws IOException {
-        return getProxy().newEpoch(journalId, nameServiceId, nsInfo, epoch);
-      }
-    });
+    return singleThreadExecutor.submit(
+        () -> getProxy().newEpoch(journalId, nameServiceId, nsInfo, epoch));
   }
   
   @Override
@@ -390,50 +349,43 @@ public class IPCLoggerChannel implements AsyncLogger {
     
     // When this batch is acked, we use its submission time in order
     // to calculate how far we are lagging.
-    final long submitNanos = System.nanoTime();
+    final long submitNanos = Time.monotonicNowNanos();
     
     ListenableFuture<Void> ret = null;
     try {
-      ret = singleThreadExecutor.submit(new Callable<Void>() {
-        @Override
-        public Void call() throws IOException {
-          throwIfOutOfSync();
-
-          long rpcSendTimeNanos = System.nanoTime();
-          try {
-            getProxy().journal(createReqInfo(),
-                segmentTxId, firstTxnId, numTxns, data);
-          } catch (IOException e) {
-            QuorumJournalManager.LOG.warn(
-                "Remote journal " + IPCLoggerChannel.this + " failed to " +
-                "write txns " + firstTxnId + "-" + (firstTxnId + numTxns - 1) +
-                ". Will try to write to this JN again after the next " +
-                "log roll.", e); 
-            synchronized (IPCLoggerChannel.this) {
-              outOfSync = true;
-            }
-            throw e;
-          } finally {
-            long now = System.nanoTime();
-            long rpcTime = TimeUnit.MICROSECONDS.convert(
-                now - rpcSendTimeNanos, TimeUnit.NANOSECONDS);
-            long endToEndTime = TimeUnit.MICROSECONDS.convert(
-                now - submitNanos, TimeUnit.NANOSECONDS);
-            metrics.addWriteEndToEndLatency(endToEndTime);
-            metrics.addWriteRpcLatency(rpcTime);
-            if (rpcTime / 1000 > WARN_JOURNAL_MILLIS_THRESHOLD) {
-              QuorumJournalManager.LOG.warn(
-                  "Took " + (rpcTime / 1000) + "ms to send a batch of " +
-                  numTxns + " edits (" + data.length + " bytes) to " +
-                  "remote journal " + IPCLoggerChannel.this);
-            }
-          }
+      ret = singleThreadExecutor.submit(() -> {
+        throwIfOutOfSync();
+
+        final long rpcSendTimeNanos = Time.monotonicNowNanos();
+        try {
+          getProxy().journal(createReqInfo(), segmentTxId, firstTxnId, numTxns, data);
+        } catch (IOException e) {
+          QuorumJournalManager.LOG.warn("Remote journal {} failed to write txns {}-{}."
+                  + " Will try to write to this JN again after the next log roll.",
+              IPCLoggerChannel.this, firstTxnId, (firstTxnId + numTxns - 1), e);
           synchronized (IPCLoggerChannel.this) {
-            highestAckedTxId = firstTxnId + numTxns - 1;
-            lastAckNanos = submitNanos;
+            outOfSync = true;
           }
-          return null;
+          throw e;
+        } finally {
+          final long nowNanos = Time.monotonicNowNanos();
+          final long rpcTimeMicros = TimeUnit.MICROSECONDS.convert(
+              (nowNanos - rpcSendTimeNanos), TimeUnit.NANOSECONDS);
+          final long endToEndTimeMicros = TimeUnit.MICROSECONDS.convert(
+              (nowNanos - submitNanos), TimeUnit.NANOSECONDS);
+          metrics.addWriteEndToEndLatency(endToEndTimeMicros);
+          metrics.addWriteRpcLatency(rpcTimeMicros);
+          if (rpcTimeMicros / 1000 > WARN_JOURNAL_MILLIS_THRESHOLD) {
+            QuorumJournalManager.LOG.warn(
+                "Took {}ms to send a batch of {} edits ({} bytes) to remote journal {}.",
+                rpcTimeMicros / 1000, numTxns, data.length, IPCLoggerChannel.this);
+          }
+        }
+        synchronized (IPCLoggerChannel.this) {
+          highestAckedTxId = firstTxnId + numTxns - 1;
+          lastAckNanos = submitNanos;
         }
+        return null;
       });
     } finally {
       if (ret == null) {
@@ -460,14 +412,12 @@ public class IPCLoggerChannel implements AsyncLogger {
     return ret;
   }
 
-  private void throwIfOutOfSync()
-      throws JournalOutOfSyncException, IOException {
+  private void throwIfOutOfSync() throws IOException {
     if (isOutOfSync()) {
       // Even if we're out of sync, it's useful to send an RPC
       // to the remote node in order to update its lag metrics, etc.
       heartbeatIfNecessary();
-      throw new JournalOutOfSyncException(
-          "Journal disabled until next roll");
+      throw new JournalOutOfSyncException("Journal disabled until next roll");
     }
   }
 
@@ -497,12 +447,10 @@ public class IPCLoggerChannel implements AsyncLogger {
   private synchronized void reserveQueueSpace(int size)
       throws LoggerTooFarBehindException {
     Preconditions.checkArgument(size >= 0);
-    if (queuedEditsSizeBytes + size > queueSizeLimitBytes &&
-        queuedEditsSizeBytes > 0) {
-      QuorumJournalManager.LOG.warn("Pending edits to " + IPCLoggerChannel.this
-          + " is going to exceed limit size: " + queueSizeLimitBytes
-          + ", current queued edits size: " + queuedEditsSizeBytes
-          + ", will silently drop " + size + " bytes of edits!");
+    if (queuedEditsSizeBytes + size > queueSizeLimitBytes && queuedEditsSizeBytes > 0) {
+      QuorumJournalManager.LOG.warn("Pending edits to {} is going to exceed limit size: {}"
+          + ", current queued edits size: {}, will silently drop {} bytes of edits!",
+          IPCLoggerChannel.class, queueSizeLimitBytes, queuedEditsSizeBytes, size);
       throw new LoggerTooFarBehindException();
     }
     queuedEditsSizeBytes += size;
@@ -514,203 +462,144 @@ public class IPCLoggerChannel implements AsyncLogger {
   }
 
   @Override
-  public ListenableFuture<Void> format(final NamespaceInfo nsInfo,
-      final boolean force) {
-    return singleThreadExecutor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        getProxy().format(journalId, nameServiceId, nsInfo, force);
-        return null;
-      }
+  public ListenableFuture<Void> format(final NamespaceInfo nsInfo, final boolean force) {
+    return singleThreadExecutor.submit(() -> {
+      getProxy().format(journalId, nameServiceId, nsInfo, force);
+      return null;
     });
   }
   
   @Override
-  public ListenableFuture<Void> startLogSegment(final long txid,
-      final int layoutVersion) {
-    return singleThreadExecutor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws IOException {
-        getProxy().startLogSegment(createReqInfo(), txid, layoutVersion);
-        synchronized (IPCLoggerChannel.this) {
-          if (outOfSync) {
-            outOfSync = false;
-            QuorumJournalManager.LOG.info(
-                "Restarting previously-stopped writes to " +
-                IPCLoggerChannel.this + " in segment starting at txid " +
-                txid);
-          }
+  public ListenableFuture<Void> startLogSegment(final long txid, final int layoutVersion) {
+    return singleThreadExecutor.submit(() -> {
+      getProxy().startLogSegment(createReqInfo(), txid, layoutVersion);
+      synchronized (IPCLoggerChannel.this) {
+        if (outOfSync) {
+          outOfSync = false;
+          QuorumJournalManager.LOG.info(
+              "Restarting previously-stopped writes to {} in segment starting at txid {}.",
+                  IPCLoggerChannel.class, txid);
         }
-        return null;
       }
+      return null;
     });
   }
   
   @Override
-  public ListenableFuture<Void> finalizeLogSegment(
-      final long startTxId, final long endTxId) {
-    return singleThreadExecutor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws IOException {
-        throwIfOutOfSync();
-        
-        getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId);
-        return null;
-      }
+  public ListenableFuture<Void> finalizeLogSegment(final long startTxId, final long endTxId) {
+    return singleThreadExecutor.submit(() -> {
+      throwIfOutOfSync();
+      getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId);
+      return null;
     });
   }
   
   @Override
   public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) {
-    return singleThreadExecutor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
-        return null;
-      }
+    return singleThreadExecutor.submit(() -> {
+      getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
+      return null;
     });
   }
 
   @Override
   public ListenableFuture<GetJournaledEditsResponseProto> getJournaledEdits(
       long fromTxnId, int maxTransactions) {
-    return parallelExecutor.submit(
-        new Callable<GetJournaledEditsResponseProto>() {
-          @Override
-          public GetJournaledEditsResponseProto call() throws IOException {
-            return getProxy().getJournaledEdits(journalId, nameServiceId,
-                fromTxnId, maxTransactions);
-          }
-        });
+    return parallelExecutor.submit(() -> getProxy().getJournaledEdits(
+        journalId, nameServiceId, fromTxnId, maxTransactions));
   }
 
   @Override
   public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
       final long fromTxnId, final boolean inProgressOk) {
-    return parallelExecutor.submit(new Callable<RemoteEditLogManifest>() {
-      @Override
-      public RemoteEditLogManifest call() throws IOException {
-        GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
-            journalId, nameServiceId, fromTxnId, inProgressOk);
-        // Update the http port, since we need this to build URLs to any of the
-        // returned logs.
-        constructHttpServerURI(ret);
-        return PBHelper.convert(ret.getManifest());
-      }
+    return parallelExecutor.submit(() -> {
+      GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
+          journalId, nameServiceId, fromTxnId, inProgressOk);
+      // Update the http port, since we need this to build URLs to any of the
+      // returned logs.
+      constructHttpServerURI(ret);
+      return PBHelper.convert(ret.getManifest());
     });
   }
 
   @Override
-  public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(
-      final long segmentTxId) {
-    return singleThreadExecutor.submit(new Callable<PrepareRecoveryResponseProto>() {
-      @Override
-      public PrepareRecoveryResponseProto call() throws IOException {
-        if (!hasHttpServerEndPoint()) {
-          // force an RPC call so we know what the HTTP port should be if it
-          // haven't done so.
-          GetJournalStateResponseProto ret = getProxy().getJournalState(
-              journalId, nameServiceId);
-          constructHttpServerURI(ret);
-        }
-        return getProxy().prepareRecovery(createReqInfo(), segmentTxId);
+  public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(final long segmentTxId) {
+    return singleThreadExecutor.submit(() -> {
+      if (!hasHttpServerEndPoint()) {
+        // force an RPC call, so we know what the HTTP port should be if it
+        // hasn't done so.
+        GetJournalStateResponseProto ret = getProxy().getJournalState(
+            journalId, nameServiceId);
+        constructHttpServerURI(ret);
       }
+      return getProxy().prepareRecovery(createReqInfo(), segmentTxId);
     });
   }
 
   @Override
-  public ListenableFuture<Void> acceptRecovery(
-      final SegmentStateProto log, final URL url) {
-    return singleThreadExecutor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws IOException {
-        getProxy().acceptRecovery(createReqInfo(), log, url);
-        return null;
-      }
+  public ListenableFuture<Void> acceptRecovery(final SegmentStateProto log, final URL url) {
+    return singleThreadExecutor.submit(() -> {
+      getProxy().acceptRecovery(createReqInfo(), log, url);
+      return null;
     });
   }
   
   @Override
   public ListenableFuture<Void> doPreUpgrade() {
-    return singleThreadExecutor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws IOException {
-        getProxy().doPreUpgrade(journalId);
-        return null;
-      }
+    return singleThreadExecutor.submit(() -> {
+      getProxy().doPreUpgrade(journalId);
+      return null;
     });
   }
   
   @Override
   public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) {
-    return singleThreadExecutor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws IOException {
-        getProxy().doUpgrade(journalId, sInfo);
-        return null;
-      }
+    return singleThreadExecutor.submit(() -> {
+      getProxy().doUpgrade(journalId, sInfo);
+      return null;
     });
   }
   
   @Override
   public ListenableFuture<Void> doFinalize() {
-    return singleThreadExecutor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws IOException {
-        getProxy().doFinalize(journalId, nameServiceId);
-        return null;
-      }
+    return singleThreadExecutor.submit(() -> {
+      getProxy().doFinalize(journalId, nameServiceId);
+      return null;
     });
   }
   
   @Override
   public ListenableFuture<Boolean> canRollBack(final StorageInfo storage,
       final StorageInfo prevStorage, final int targetLayoutVersion) {
-    return singleThreadExecutor.submit(new Callable<Boolean>() {
-      @Override
-      public Boolean call() throws IOException {
-        return getProxy().canRollBack(journalId, nameServiceId,
-            storage, prevStorage, targetLayoutVersion);
-      }
-    });
+    return singleThreadExecutor.submit(
+        () -> getProxy().canRollBack(journalId, nameServiceId,
+            storage, prevStorage, targetLayoutVersion));
   }
 
   @Override
   public ListenableFuture<Void> doRollback() {
-    return singleThreadExecutor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws IOException {
-        getProxy().doRollback(journalId, nameServiceId);
-        return null;
-      }
+    return singleThreadExecutor.submit(() -> {
+      getProxy().doRollback(journalId, nameServiceId);
+      return null;
     });
   }
 
   @Override
   public ListenableFuture<Void> discardSegments(final long startTxId) {
-    return singleThreadExecutor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws IOException {
-        getProxy().discardSegments(journalId, nameServiceId, startTxId);
-        return null;
-      }
+    return singleThreadExecutor.submit(() -> {
+      getProxy().discardSegments(journalId, nameServiceId, startTxId);
+      return null;
     });
   }
 
   @Override
   public ListenableFuture<Long> getJournalCTime() {
-    return singleThreadExecutor.submit(new Callable<Long>() {
-      @Override
-      public Long call() throws IOException {
-        return getProxy().getJournalCTime(journalId, nameServiceId);
-      }
-    });
+    return singleThreadExecutor.submit(() -> getProxy().getJournalCTime(journalId, nameServiceId));
   }
 
   @Override
   public String toString() {
-    return InetAddresses.toAddrString(addr.getAddress()) + ':' +
-        addr.getPort();
+    return InetAddresses.toAddrString(addr.getAddress()) + ':' + addr.getPort();
   }
 
   @Override
@@ -778,5 +667,4 @@ public class IPCLoggerChannel implements AsyncLogger {
   private boolean hasHttpServerEndPoint() {
    return httpServerURL != null;
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org