You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/09 01:46:11 UTC
[1/4] hadoop git commit: HDFS-7446. HDFS inotify should have the
ability to determine what txid it has read up to (cmccabe) (cherry picked
from commit 75a326aaff8c92349701d9b3473c3070b8c2be44)
Repository: hadoop
Updated Branches:
refs/heads/branch-2.6.1 8ed162bcb -> 33537078a
HDFS-7446. HDFS inotify should have the ability to determine what txid it has read up to (cmccabe)
(cherry picked from commit 75a326aaff8c92349701d9b3473c3070b8c2be44)
(cherry picked from commit 06552a15d5172a2b0ad3d61aa7f9a849857385aa)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/43631451
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/43631451
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/43631451
Branch: refs/heads/branch-2.6.1
Commit: 4363145128f91b2fb1f1c0254ee5e8621a1ac383
Parents: 8ed162b
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Tue Nov 25 17:44:34 2014 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue Sep 8 15:32:17 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/DFSInotifyEventInputStream.java | 65 ++--
.../apache/hadoop/hdfs/inotify/EventBatch.java | 41 +++
.../hadoop/hdfs/inotify/EventBatchList.java | 63 ++++
.../apache/hadoop/hdfs/inotify/EventsList.java | 63 ----
.../hadoop/hdfs/protocol/ClientProtocol.java | 8 +-
.../ClientNamenodeProtocolTranslatorPB.java | 4 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 341 ++++++++++---------
.../namenode/InotifyFSEditLogOpTranslator.java | 74 ++--
.../hdfs/server/namenode/NameNodeRpcServer.java | 45 +--
.../hadoop-hdfs/src/main/proto/inotify.proto | 10 +-
.../hdfs/TestDFSInotifyEventInputStream.java | 209 +++++++-----
12 files changed, 526 insertions(+), 400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d2f07c2..1f6ce36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -156,6 +156,9 @@ Release 2.6.1 - UNRELEASED
HDFS-7980. Incremental BlockReport will dramatically slow down namenode
startup. (Walter Su via szetszwo)
+ HDFS-7446. HDFS inotify should have the ability to determine what txid it
+ has read up to (cmccabe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
index 73c5f55..83b92b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
@@ -19,11 +19,10 @@
package org.apache.hadoop.hdfs;
import com.google.common.collect.Iterators;
-import com.google.common.util.concurrent.UncheckedExecutionException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.inotify.Event;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.util.Time;
@@ -33,13 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
/**
* Stream for reading inotify events. DFSInotifyEventInputStreams should not
@@ -52,7 +45,7 @@ public class DFSInotifyEventInputStream {
.class);
private final ClientProtocol namenode;
- private Iterator<Event> it;
+ private Iterator<EventBatch> it;
private long lastReadTxid;
/**
* The most recent txid the NameNode told us it has sync'ed -- helps us
@@ -78,22 +71,22 @@ public class DFSInotifyEventInputStream {
}
/**
- * Returns the next event in the stream or null if no new events are currently
- * available.
+ * Returns the next batch of events in the stream or null if no new
+ * batches are currently available.
*
* @throws IOException because of network error or edit log
* corruption. Also possible if JournalNodes are unresponsive in the
* QJM setting (even one unresponsive JournalNode is enough in rare cases),
* so catching this exception and retrying at least a few times is
* recommended.
- * @throws MissingEventsException if we cannot return the next event in the
- * stream because the data for the event (and possibly some subsequent events)
- * has been deleted (generally because this stream is a very large number of
- * events behind the current state of the NameNode). It is safe to continue
- * reading from the stream after this exception is thrown -- the next
- * available event will be returned.
+ * @throws MissingEventsException if we cannot return the next batch in the
+ * stream because the data for the events (and possibly some subsequent
+ * events) has been deleted (generally because this stream is a very large
+ * number of transactions behind the current state of the NameNode). It is
+ * safe to continue reading from the stream after this exception is thrown
+ * The next available batch of events will be returned.
*/
- public Event poll() throws IOException, MissingEventsException {
+ public EventBatch poll() throws IOException, MissingEventsException {
// need to keep retrying until the NN sends us the latest committed txid
if (lastReadTxid == -1) {
LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
@@ -101,14 +94,14 @@ public class DFSInotifyEventInputStream {
return null;
}
if (!it.hasNext()) {
- EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1);
+ EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
if (el.getLastTxid() != -1) {
// we only want to set syncTxid when we were actually able to read some
// edits on the NN -- otherwise it will seem like edits are being
// generated faster than we can read them when the problem is really
// that we are temporarily unable to read edits
syncTxid = el.getSyncTxid();
- it = el.getEvents().iterator();
+ it = el.getBatches().iterator();
long formerLastReadTxid = lastReadTxid;
lastReadTxid = el.getLastTxid();
if (el.getFirstTxid() != formerLastReadTxid + 1) {
@@ -131,18 +124,18 @@ public class DFSInotifyEventInputStream {
}
/**
- * Return a estimate of how many events behind the NameNode's current state
- * this stream is. Clients should periodically call this method and check if
- * its result is steadily increasing, which indicates that they are falling
- * behind (i.e. events are being generated faster than the client is reading
- * them). If a client falls too far behind events may be deleted before the
- * client can read them.
+ * Return a estimate of how many transaction IDs behind the NameNode's
+ * current state this stream is. Clients should periodically call this method
+ * and check if its result is steadily increasing, which indicates that they
+ * are falling behind (i.e. transaction are being generated faster than the
+ * client is reading them). If a client falls too far behind events may be
+ * deleted before the client can read them.
* <p/>
* A return value of -1 indicates that an estimate could not be produced, and
* should be ignored. The value returned by this method is really only useful
* when compared to previous or subsequent returned values.
*/
- public long getEventsBehindEstimate() {
+ public long getTxidsBehindEstimate() {
if (syncTxid == 0) {
return -1;
} else {
@@ -155,8 +148,8 @@ public class DFSInotifyEventInputStream {
}
/**
- * Returns the next event in the stream, waiting up to the specified amount of
- * time for a new event. Returns null if a new event is not available at the
+ * Returns the next event batch in the stream, waiting up to the specified
+ * amount of time for a new batch. Returns null if one is not available at the
* end of the specified amount of time. The time before the method returns may
* exceed the specified amount of time by up to the time required for an RPC
* to the NameNode.
@@ -168,12 +161,12 @@ public class DFSInotifyEventInputStream {
* see {@link DFSInotifyEventInputStream#poll()}
* @throws InterruptedException if the calling thread is interrupted
*/
- public Event poll(long time, TimeUnit tu) throws IOException,
+ public EventBatch poll(long time, TimeUnit tu) throws IOException,
InterruptedException, MissingEventsException {
long initialTime = Time.monotonicNow();
long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
long nextWait = INITIAL_WAIT_MS;
- Event next = null;
+ EventBatch next = null;
while ((next = poll()) == null) {
long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
if (timeLeft <= 0) {
@@ -193,17 +186,17 @@ public class DFSInotifyEventInputStream {
}
/**
- * Returns the next event in the stream, waiting indefinitely if a new event
- * is not immediately available.
+ * Returns the next batch of events in the stream, waiting indefinitely if
+ * a new batch is not immediately available.
*
* @throws IOException see {@link DFSInotifyEventInputStream#poll()}
* @throws MissingEventsException see
* {@link DFSInotifyEventInputStream#poll()}
* @throws InterruptedException if the calling thread is interrupted
*/
- public Event take() throws IOException, InterruptedException,
+ public EventBatch take() throws IOException, InterruptedException,
MissingEventsException {
- Event next = null;
+ EventBatch next = null;
int nextWaitMin = INITIAL_WAIT_MS;
while ((next = poll()) == null) {
// sleep for a random period between nextWaitMin and nextWaitMin * 2
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java
new file mode 100644
index 0000000..0ad1070
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.inotify;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A batch of events that all happened on the same transaction ID.
+ */
+@InterfaceAudience.Public
+public class EventBatch {
+ private final long txid;
+ private final Event[] events;
+
+ public EventBatch(long txid, Event[] events) {
+ this.txid = txid;
+ this.events = events;
+ }
+
+ public long getTxid() {
+ return txid;
+ }
+
+ public Event[] getEvents() { return events; }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java
new file mode 100644
index 0000000..9c97038
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.inotify;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.List;
+
+/**
+ * Contains a list of event batches, the transaction ID in the edit log up to
+ * which we read to produce these events, and the first txid we observed when
+ * producing these events (the last of which is for the purpose of determining
+ * whether we have missed events due to edit deletion). Also contains the most
+ * recent txid that the NameNode has sync'ed, so the client can determine how
+ * far behind in the edit log it is.
+ */
+@InterfaceAudience.Private
+public class EventBatchList {
+ private List<EventBatch> batches;
+ private long firstTxid;
+ private long lastTxid;
+ private long syncTxid;
+
+ public EventBatchList(List<EventBatch> batches, long firstTxid,
+ long lastTxid, long syncTxid) {
+ this.batches = batches;
+ this.firstTxid = firstTxid;
+ this.lastTxid = lastTxid;
+ this.syncTxid = syncTxid;
+ }
+
+ public List<EventBatch> getBatches() {
+ return batches;
+ }
+
+ public long getFirstTxid() {
+ return firstTxid;
+ }
+
+ public long getLastTxid() {
+ return lastTxid;
+ }
+
+ public long getSyncTxid() {
+ return syncTxid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java
deleted file mode 100644
index 6d02d3c..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.inotify;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.util.List;
-
-/**
- * Contains a set of events, the transaction ID in the edit log up to which we
- * read to produce these events, and the first txid we observed when producing
- * these events (the last of which is for the purpose of determining whether we
- * have missed events due to edit deletion). Also contains the most recent txid
- * that the NameNode has sync'ed, so the client can determine how far behind in
- * the edit log it is.
- */
-@InterfaceAudience.Private
-public class EventsList {
- private List<Event> events;
- private long firstTxid;
- private long lastTxid;
- private long syncTxid;
-
- public EventsList(List<Event> events, long firstTxid, long lastTxid,
- long syncTxid) {
- this.events = events;
- this.firstTxid = firstTxid;
- this.lastTxid = lastTxid;
- this.syncTxid = syncTxid;
- }
-
- public List<Event> getEvents() {
- return events;
- }
-
- public long getFirstTxid() {
- return firstTxid;
- }
-
- public long getLastTxid() {
- return lastTxid;
- }
-
- public long getSyncTxid() {
- return syncTxid;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index d29d2eb..f3a390a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -1405,9 +1405,9 @@ public interface ClientProtocol {
public long getCurrentEditLogTxid() throws IOException;
/**
- * Get an ordered list of events corresponding to the edit log transactions
- * from txid onwards.
+ * Get an ordered list of batches of events corresponding to the edit log
+ * transactions for txids equal to or greater than txid.
*/
@Idempotent
- public EventsList getEditsFromTxid(long txid) throws IOException;
+ public EventBatchList getEditsFromTxid(long txid) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 077a3e9..7a2dd15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -1480,7 +1480,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
- public EventsList getEditsFromTxid(long txid) throws IOException {
+ public EventBatchList getEditsFromTxid(long txid) throws IOException {
GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder()
.setTxid(txid).build();
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index ae9cb3e..c52588f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -46,11 +46,12 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.inotify.Event;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -2516,173 +2517,197 @@ public class PBHelper {
}
}
- public static EventsList convert(GetEditsFromTxidResponseProto resp) throws
+ public static EventBatchList convert(GetEditsFromTxidResponseProto resp) throws
IOException {
- List<Event> events = Lists.newArrayList();
- for (InotifyProtos.EventProto p : resp.getEventsList().getEventsList()) {
- switch(p.getType()) {
- case EVENT_CLOSE:
- InotifyProtos.CloseEventProto close =
- InotifyProtos.CloseEventProto.parseFrom(p.getContents());
- events.add(new Event.CloseEvent(close.getPath(), close.getFileSize(),
- close.getTimestamp()));
- break;
- case EVENT_CREATE:
- InotifyProtos.CreateEventProto create =
- InotifyProtos.CreateEventProto.parseFrom(p.getContents());
- events.add(new Event.CreateEvent.Builder()
- .iNodeType(createTypeConvert(create.getType()))
- .path(create.getPath())
- .ctime(create.getCtime())
- .ownerName(create.getOwnerName())
- .groupName(create.getGroupName())
- .perms(convert(create.getPerms()))
- .replication(create.getReplication())
- .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
- create.getSymlinkTarget())
- .overwrite(create.getOverwrite()).build());
- break;
- case EVENT_METADATA:
- InotifyProtos.MetadataUpdateEventProto meta =
- InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
- events.add(new Event.MetadataUpdateEvent.Builder()
- .path(meta.getPath())
- .metadataType(metadataUpdateTypeConvert(meta.getType()))
- .mtime(meta.getMtime())
- .atime(meta.getAtime())
- .replication(meta.getReplication())
- .ownerName(
- meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
- .groupName(
- meta.getGroupName().isEmpty() ? null : meta.getGroupName())
- .perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
- .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
- meta.getAclsList()))
- .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
- meta.getXAttrsList()))
- .xAttrsRemoved(meta.getXAttrsRemoved())
- .build());
- break;
- case EVENT_RENAME:
- InotifyProtos.RenameEventProto rename =
- InotifyProtos.RenameEventProto.parseFrom(p.getContents());
- events.add(new Event.RenameEvent(rename.getSrcPath(), rename.getDestPath(),
- rename.getTimestamp()));
- break;
- case EVENT_APPEND:
- InotifyProtos.AppendEventProto reopen =
- InotifyProtos.AppendEventProto.parseFrom(p.getContents());
- events.add(new Event.AppendEvent(reopen.getPath()));
- break;
- case EVENT_UNLINK:
- InotifyProtos.UnlinkEventProto unlink =
- InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
- events.add(new Event.UnlinkEvent(unlink.getPath(), unlink.getTimestamp()));
- break;
- default:
- throw new RuntimeException("Unexpected inotify event type: " +
- p.getType());
+ final InotifyProtos.EventsListProto list = resp.getEventsList();
+ final long firstTxid = list.getFirstTxid();
+ final long lastTxid = list.getLastTxid();
+
+ List<EventBatch> batches = Lists.newArrayList();
+ if (list.getEventsList().size() > 0) {
+ throw new IOException("Can't handle old inotify server response.");
+ }
+ for (InotifyProtos.EventBatchProto bp : list.getBatchList()) {
+ long txid = bp.getTxid();
+ if ((txid != -1) && ((txid < firstTxid) || (txid > lastTxid))) {
+ throw new IOException("Error converting TxidResponseProto: got a " +
+ "transaction id " + txid + " that was outside the range of [" +
+ firstTxid + ", " + lastTxid + "].");
+ }
+ List<Event> events = Lists.newArrayList();
+ for (InotifyProtos.EventProto p : bp.getEventsList()) {
+ switch (p.getType()) {
+ case EVENT_CLOSE:
+ InotifyProtos.CloseEventProto close =
+ InotifyProtos.CloseEventProto.parseFrom(p.getContents());
+ events.add(new Event.CloseEvent(close.getPath(),
+ close.getFileSize(), close.getTimestamp()));
+ break;
+ case EVENT_CREATE:
+ InotifyProtos.CreateEventProto create =
+ InotifyProtos.CreateEventProto.parseFrom(p.getContents());
+ events.add(new Event.CreateEvent.Builder()
+ .iNodeType(createTypeConvert(create.getType()))
+ .path(create.getPath())
+ .ctime(create.getCtime())
+ .ownerName(create.getOwnerName())
+ .groupName(create.getGroupName())
+ .perms(convert(create.getPerms()))
+ .replication(create.getReplication())
+ .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
+ create.getSymlinkTarget())
+ .overwrite(create.getOverwrite()).build());
+ break;
+ case EVENT_METADATA:
+ InotifyProtos.MetadataUpdateEventProto meta =
+ InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
+ events.add(new Event.MetadataUpdateEvent.Builder()
+ .path(meta.getPath())
+ .metadataType(metadataUpdateTypeConvert(meta.getType()))
+ .mtime(meta.getMtime())
+ .atime(meta.getAtime())
+ .replication(meta.getReplication())
+ .ownerName(
+ meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
+ .groupName(
+ meta.getGroupName().isEmpty() ? null : meta.getGroupName())
+ .perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
+ .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
+ meta.getAclsList()))
+ .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
+ meta.getXAttrsList()))
+ .xAttrsRemoved(meta.getXAttrsRemoved())
+ .build());
+ break;
+ case EVENT_RENAME:
+ InotifyProtos.RenameEventProto rename =
+ InotifyProtos.RenameEventProto.parseFrom(p.getContents());
+ events.add(new Event.RenameEvent(rename.getSrcPath(),
+ rename.getDestPath(), rename.getTimestamp()));
+ break;
+ case EVENT_APPEND:
+ InotifyProtos.AppendEventProto reopen =
+ InotifyProtos.AppendEventProto.parseFrom(p.getContents());
+ events.add(new Event.AppendEvent(reopen.getPath()));
+ break;
+ case EVENT_UNLINK:
+ InotifyProtos.UnlinkEventProto unlink =
+ InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
+ events.add(new Event.UnlinkEvent(unlink.getPath(),
+ unlink.getTimestamp()));
+ break;
+ default:
+ throw new RuntimeException("Unexpected inotify event type: " +
+ p.getType());
+ }
}
+ batches.add(new EventBatch(txid, events.toArray(new Event[0])));
}
- return new EventsList(events, resp.getEventsList().getFirstTxid(),
+ return new EventBatchList(batches, resp.getEventsList().getFirstTxid(),
resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid());
}
- public static GetEditsFromTxidResponseProto convertEditsResponse(EventsList el) {
+ public static GetEditsFromTxidResponseProto convertEditsResponse(EventBatchList el) {
InotifyProtos.EventsListProto.Builder builder =
InotifyProtos.EventsListProto.newBuilder();
- for (Event e : el.getEvents()) {
- switch(e.getEventType()) {
- case CLOSE:
- Event.CloseEvent ce = (Event.CloseEvent) e;
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_CLOSE)
- .setContents(
- InotifyProtos.CloseEventProto.newBuilder()
- .setPath(ce.getPath())
- .setFileSize(ce.getFileSize())
- .setTimestamp(ce.getTimestamp()).build().toByteString()
- ).build());
- break;
- case CREATE:
- Event.CreateEvent ce2 = (Event.CreateEvent) e;
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_CREATE)
- .setContents(
- InotifyProtos.CreateEventProto.newBuilder()
- .setType(createTypeConvert(ce2.getiNodeType()))
- .setPath(ce2.getPath())
- .setCtime(ce2.getCtime())
- .setOwnerName(ce2.getOwnerName())
- .setGroupName(ce2.getGroupName())
- .setPerms(convert(ce2.getPerms()))
- .setReplication(ce2.getReplication())
- .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
- "" : ce2.getSymlinkTarget())
- .setOverwrite(ce2.getOverwrite()).build().toByteString()
- ).build());
- break;
- case METADATA:
- Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
- InotifyProtos.MetadataUpdateEventProto.Builder metaB =
- InotifyProtos.MetadataUpdateEventProto.newBuilder()
- .setPath(me.getPath())
- .setType(metadataUpdateTypeConvert(me.getMetadataType()))
- .setMtime(me.getMtime())
- .setAtime(me.getAtime())
- .setReplication(me.getReplication())
- .setOwnerName(me.getOwnerName() == null ? "" :
- me.getOwnerName())
- .setGroupName(me.getGroupName() == null ? "" :
- me.getGroupName())
- .addAllAcls(me.getAcls() == null ?
- Lists.<AclEntryProto>newArrayList() :
- convertAclEntryProto(me.getAcls()))
- .addAllXAttrs(me.getxAttrs() == null ?
- Lists.<XAttrProto>newArrayList() :
- convertXAttrProto(me.getxAttrs()))
- .setXAttrsRemoved(me.isxAttrsRemoved());
- if (me.getPerms() != null) {
- metaB.setPerms(convert(me.getPerms()));
+ for (EventBatch b : el.getBatches()) {
+ List<InotifyProtos.EventProto> events = Lists.newArrayList();
+ for (Event e : b.getEvents()) {
+ switch (e.getEventType()) {
+ case CLOSE:
+ Event.CloseEvent ce = (Event.CloseEvent) e;
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_CLOSE)
+ .setContents(
+ InotifyProtos.CloseEventProto.newBuilder()
+ .setPath(ce.getPath())
+ .setFileSize(ce.getFileSize())
+ .setTimestamp(ce.getTimestamp()).build().toByteString()
+ ).build());
+ break;
+ case CREATE:
+ Event.CreateEvent ce2 = (Event.CreateEvent) e;
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_CREATE)
+ .setContents(
+ InotifyProtos.CreateEventProto.newBuilder()
+ .setType(createTypeConvert(ce2.getiNodeType()))
+ .setPath(ce2.getPath())
+ .setCtime(ce2.getCtime())
+ .setOwnerName(ce2.getOwnerName())
+ .setGroupName(ce2.getGroupName())
+ .setPerms(convert(ce2.getPerms()))
+ .setReplication(ce2.getReplication())
+ .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
+ "" : ce2.getSymlinkTarget())
+ .setOverwrite(ce2.getOverwrite()).build().toByteString()
+ ).build());
+ break;
+ case METADATA:
+ Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
+ InotifyProtos.MetadataUpdateEventProto.Builder metaB =
+ InotifyProtos.MetadataUpdateEventProto.newBuilder()
+ .setPath(me.getPath())
+ .setType(metadataUpdateTypeConvert(me.getMetadataType()))
+ .setMtime(me.getMtime())
+ .setAtime(me.getAtime())
+ .setReplication(me.getReplication())
+ .setOwnerName(me.getOwnerName() == null ? "" :
+ me.getOwnerName())
+ .setGroupName(me.getGroupName() == null ? "" :
+ me.getGroupName())
+ .addAllAcls(me.getAcls() == null ?
+ Lists.<AclEntryProto>newArrayList() :
+ convertAclEntryProto(me.getAcls()))
+ .addAllXAttrs(me.getxAttrs() == null ?
+ Lists.<XAttrProto>newArrayList() :
+ convertXAttrProto(me.getxAttrs()))
+ .setXAttrsRemoved(me.isxAttrsRemoved());
+ if (me.getPerms() != null) {
+ metaB.setPerms(convert(me.getPerms()));
+ }
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_METADATA)
+ .setContents(metaB.build().toByteString())
+ .build());
+ break;
+ case RENAME:
+ Event.RenameEvent re = (Event.RenameEvent) e;
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_RENAME)
+ .setContents(
+ InotifyProtos.RenameEventProto.newBuilder()
+ .setSrcPath(re.getSrcPath())
+ .setDestPath(re.getDstPath())
+ .setTimestamp(re.getTimestamp()).build().toByteString()
+ ).build());
+ break;
+ case APPEND:
+ Event.AppendEvent re2 = (Event.AppendEvent) e;
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_APPEND)
+ .setContents(
+ InotifyProtos.AppendEventProto.newBuilder()
+ .setPath(re2.getPath()).build().toByteString()
+ ).build());
+ break;
+ case UNLINK:
+ Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_UNLINK)
+ .setContents(
+ InotifyProtos.UnlinkEventProto.newBuilder()
+ .setPath(ue.getPath())
+ .setTimestamp(ue.getTimestamp()).build().toByteString()
+ ).build());
+ break;
+ default:
+ throw new RuntimeException("Unexpected inotify event: " + e);
}
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_METADATA)
- .setContents(metaB.build().toByteString())
- .build());
- break;
- case RENAME:
- Event.RenameEvent re = (Event.RenameEvent) e;
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_RENAME)
- .setContents(
- InotifyProtos.RenameEventProto.newBuilder()
- .setSrcPath(re.getSrcPath())
- .setDestPath(re.getDstPath())
- .setTimestamp(re.getTimestamp()).build().toByteString()
- ).build());
- break;
- case APPEND:
- Event.AppendEvent re2 = (Event.AppendEvent) e;
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_APPEND)
- .setContents(
- InotifyProtos.AppendEventProto.newBuilder()
- .setPath(re2.getPath()).build().toByteString()
- ).build());
- break;
- case UNLINK:
- Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_UNLINK)
- .setContents(
- InotifyProtos.UnlinkEventProto.newBuilder()
- .setPath(ue.getPath())
- .setTimestamp(ue.getTimestamp()).build().toByteString()
- ).build());
- break;
- default:
- throw new RuntimeException("Unexpected inotify event: " + e);
}
+ builder.addBatch(InotifyProtos.EventBatchProto.newBuilder().
+ setTxid(b.getTxid()).
+ addAllEvents(events));
}
builder.setFirstTxid(el.getFirstTxid());
builder.setLastTxid(el.getLastTxid());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
index 00a5f25..cd3fc23 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.Block;
import java.util.List;
@@ -39,32 +40,35 @@ public class InotifyFSEditLogOpTranslator {
return size;
}
- public static Event[] translate(FSEditLogOp op) {
+ public static EventBatch translate(FSEditLogOp op) {
switch(op.opCode) {
case OP_ADD:
FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op;
if (addOp.blocks.length == 0) { // create
- return new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
+ return new EventBatch(op.txid,
+ new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
.ctime(addOp.atime)
.replication(addOp.replication)
.ownerName(addOp.permissions.getUserName())
.groupName(addOp.permissions.getGroupName())
.perms(addOp.permissions.getPermission())
.overwrite(addOp.overwrite)
- .iNodeType(Event.CreateEvent.INodeType.FILE).build() };
+ .iNodeType(Event.CreateEvent.INodeType.FILE).build() });
} else {
- return new Event[] { new Event.AppendEvent(addOp.path) };
+ return new EventBatch(op.txid,
+ new Event[] { new Event.AppendEvent(addOp.path) });
}
case OP_CLOSE:
FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
- return new Event[] {
- new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) };
+ return new EventBatch(op.txid, new Event[] {
+ new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) });
case OP_SET_REPLICATION:
FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION)
.path(setRepOp.path)
- .replication(setRepOp.replication).build() };
+ .replication(setRepOp.replication).build() });
case OP_CONCAT_DELETE:
FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op;
List<Event> events = Lists.newArrayList();
@@ -73,73 +77,83 @@ public class InotifyFSEditLogOpTranslator {
events.add(new Event.UnlinkEvent(src, cdOp.timestamp));
}
events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp));
- return events.toArray(new Event[0]);
+ return new EventBatch(op.txid, events.toArray(new Event[0]));
case OP_RENAME_OLD:
FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op;
- return new Event[] {
- new Event.RenameEvent(rnOpOld.src, rnOpOld.dst, rnOpOld.timestamp) };
+ return new EventBatch(op.txid, new Event[] {
+ new Event.RenameEvent(rnOpOld.src,
+ rnOpOld.dst, rnOpOld.timestamp) });
case OP_RENAME:
FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op;
- return new Event[] {
- new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) };
+ return new EventBatch(op.txid, new Event[] {
+ new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) });
case OP_DELETE:
FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op;
- return new Event[] { new Event.UnlinkEvent(delOp.path, delOp.timestamp) };
+ return new EventBatch(op.txid, new Event[] {
+ new Event.UnlinkEvent(delOp.path, delOp.timestamp) });
case OP_MKDIR:
FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op;
- return new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
+ return new EventBatch(op.txid,
+ new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
.ctime(mkOp.timestamp)
.ownerName(mkOp.permissions.getUserName())
.groupName(mkOp.permissions.getGroupName())
.perms(mkOp.permissions.getPermission())
- .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() };
+ .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() });
case OP_SET_PERMISSIONS:
FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS)
.path(permOp.src)
- .perms(permOp.permissions).build() };
+ .perms(permOp.permissions).build() });
case OP_SET_OWNER:
FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER)
.path(ownOp.src)
- .ownerName(ownOp.username).groupName(ownOp.groupname).build() };
+ .ownerName(ownOp.username).groupName(ownOp.groupname).build() });
case OP_TIMES:
FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES)
.path(timesOp.path)
- .atime(timesOp.atime).mtime(timesOp.mtime).build() };
+ .atime(timesOp.atime).mtime(timesOp.mtime).build() });
case OP_SYMLINK:
FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op;
- return new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
+ return new EventBatch(op.txid,
+ new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
.ctime(symOp.atime)
.ownerName(symOp.permissionStatus.getUserName())
.groupName(symOp.permissionStatus.getGroupName())
.perms(symOp.permissionStatus.getPermission())
.symlinkTarget(symOp.value)
- .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() };
+ .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() });
case OP_REMOVE_XATTR:
FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
.path(rxOp.src)
.xAttrs(rxOp.xAttrs)
- .xAttrsRemoved(true).build() };
+ .xAttrsRemoved(true).build() });
case OP_SET_XATTR:
FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
.path(sxOp.src)
.xAttrs(sxOp.xAttrs)
- .xAttrsRemoved(false).build() };
+ .xAttrsRemoved(false).build() });
case OP_SET_ACL:
FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS)
.path(saOp.src)
- .acls(saOp.aclEntries).build() };
+ .acls(saOp.aclEntries).build() });
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 690d7e1..16cec5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -34,7 +34,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
@@ -55,8 +54,8 @@ import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
@@ -67,8 +66,8 @@ import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
-import org.apache.hadoop.hdfs.inotify.Event;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -139,10 +138,16 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.WritableRpcEngine;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
+import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
@@ -155,19 +160,12 @@ import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolP
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
-import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
-import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
-import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.SpanReceiverInfo;
-import org.apache.hadoop.tracing.TraceAdminPB;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
@@ -175,6 +173,7 @@ import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
/**
@@ -1670,7 +1669,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
- public EventsList getEditsFromTxid(long txid) throws IOException {
+ public EventBatchList getEditsFromTxid(long txid) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.READ); // only active
namesystem.checkSuperuserPrivilege();
@@ -1689,13 +1688,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
// guaranteed to have been written by this NameNode.)
boolean readInProgress = syncTxid > 0;
- List<Event> events = Lists.newArrayList();
+ List<EventBatch> batches = Lists.newArrayList();
+ int totalEvents = 0;
long maxSeenTxid = -1;
long firstSeenTxid = -1;
if (syncTxid > 0 && txid > syncTxid) {
// we can't read past syncTxid, so there's no point in going any further
- return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+ return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
Collection<EditLogInputStream> streams = null;
@@ -1707,7 +1707,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
// will result
LOG.info("NN is transitioning from active to standby and FSEditLog " +
"is closed -- could not read edits");
- return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+ return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
boolean breakOuter = false;
@@ -1725,9 +1725,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
break;
}
- Event[] eventsFromOp = InotifyFSEditLogOpTranslator.translate(op);
- if (eventsFromOp != null) {
- events.addAll(Arrays.asList(eventsFromOp));
+ EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op);
+ if (eventBatch != null) {
+ batches.add(eventBatch);
+ totalEvents += eventBatch.getEvents().length;
}
if (op.getTransactionId() > maxSeenTxid) {
maxSeenTxid = op.getTransactionId();
@@ -1735,7 +1736,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
if (firstSeenTxid == -1) {
firstSeenTxid = op.getTransactionId();
}
- if (events.size() >= maxEventsPerRPC || (syncTxid > 0 &&
+ if (totalEvents >= maxEventsPerRPC || (syncTxid > 0 &&
op.getTransactionId() == syncTxid)) {
// we're done
breakOuter = true;
@@ -1750,7 +1751,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
}
- return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+ return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
@Override // TraceAdminProtocol
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
index a1d4d92..e51c02c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
@@ -48,6 +48,11 @@ message EventProto {
required bytes contents = 2;
}
+message EventBatchProto {
+ required int64 txid = 1;
+ repeated EventProto events = 2;
+}
+
enum INodeType {
I_TYPE_FILE = 0x0;
I_TYPE_DIRECTORY = 0x1;
@@ -111,8 +116,9 @@ message UnlinkEventProto {
}
message EventsListProto {
- repeated EventProto events = 1;
+ repeated EventProto events = 1; // deprecated
required int64 firstTxid = 2;
required int64 lastTxid = 3;
required int64 syncTxid = 4;
-}
\ No newline at end of file
+ repeated EventBatchProto batch = 5;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
index a608ba8..82db110 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
@@ -49,11 +50,17 @@ public class TestDFSInotifyEventInputStream {
private static final Log LOG = LogFactory.getLog(
TestDFSInotifyEventInputStream.class);
- private static Event waitForNextEvent(DFSInotifyEventInputStream eis)
+ private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
throws IOException, MissingEventsException {
- Event next = null;
- while ((next = eis.poll()) == null);
- return next;
+ EventBatch batch = null;
+ while ((batch = eis.poll()) == null);
+ return batch;
+ }
+
+ private static long checkTxid(EventBatch batch, long prevTxid){
+ Assert.assertTrue("Previous txid " + prevTxid + " was not less than " +
+ "new txid " + batch.getTxid(), prevTxid < batch.getTxid());
+ return batch.getTxid();
}
/**
@@ -64,7 +71,7 @@ public class TestDFSInotifyEventInputStream {
*/
@Test
public void testOpcodeCount() {
- Assert.assertTrue(FSEditLogOpCodes.values().length == 47);
+ Assert.assertEquals(47, FSEditLogOpCodes.values().length);
}
@@ -127,30 +134,36 @@ public class TestDFSInotifyEventInputStream {
"user::rwx,user:foo:rw-,group::r--,other::---", true));
client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent
- Event next = null;
+ EventBatch batch = null;
// RenameOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
- Event.RenameEvent re = (Event.RenameEvent) next;
- Assert.assertTrue(re.getDstPath().equals("/file4"));
- Assert.assertTrue(re.getSrcPath().equals("/file"));
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ long txid = batch.getTxid();
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
+ Event.RenameEvent re = (Event.RenameEvent) batch.getEvents()[0];
+ Assert.assertEquals("/file4", re.getDstPath());
+ Assert.assertEquals("/file", re.getSrcPath());
Assert.assertTrue(re.getTimestamp() > 0);
- long eventsBehind = eis.getEventsBehindEstimate();
+ long eventsBehind = eis.getTxidsBehindEstimate();
// RenameOldOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
- Event.RenameEvent re2 = (Event.RenameEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
+ Event.RenameEvent re2 = (Event.RenameEvent) batch.getEvents()[0];
Assert.assertTrue(re2.getDstPath().equals("/file2"));
Assert.assertTrue(re2.getSrcPath().equals("/file4"));
Assert.assertTrue(re.getTimestamp() > 0);
// AddOp with overwrite
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Event.CreateEvent ce = (Event.CreateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Event.CreateEvent ce = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
Assert.assertTrue(ce.getPath().equals("/file2"));
Assert.assertTrue(ce.getCtime() > 0);
@@ -159,66 +172,80 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce.getOverwrite());
// CloseOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
- Event.CloseEvent ce2 = (Event.CloseEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
+ Event.CloseEvent ce2 = (Event.CloseEvent) batch.getEvents()[0];
Assert.assertTrue(ce2.getPath().equals("/file2"));
Assert.assertTrue(ce2.getFileSize() > 0);
Assert.assertTrue(ce2.getTimestamp() > 0);
// AddOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
- Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
+ Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2"));
// CloseOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
- Assert.assertTrue(((Event.CloseEvent) next).getPath().equals("/file2"));
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
+ Assert.assertTrue(((Event.CloseEvent) batch.getEvents()[0]).getPath().equals("/file2"));
// TimesOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue.getPath().equals("/file2"));
Assert.assertTrue(mue.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.TIMES);
// SetReplicationOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue2.getPath().equals("/file2"));
Assert.assertTrue(mue2.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.REPLICATION);
Assert.assertTrue(mue2.getReplication() == 1);
// ConcatDeleteOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
- Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
- Event.UnlinkEvent ue2 = (Event.UnlinkEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(3, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
+ Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2"));
+ Assert.assertTrue(batch.getEvents()[1].getEventType() == Event.EventType.UNLINK);
+ Event.UnlinkEvent ue2 = (Event.UnlinkEvent) batch.getEvents()[1];
Assert.assertTrue(ue2.getPath().equals("/file3"));
Assert.assertTrue(ue2.getTimestamp() > 0);
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
- Event.CloseEvent ce3 = (Event.CloseEvent) next;
+ Assert.assertTrue(batch.getEvents()[2].getEventType() == Event.EventType.CLOSE);
+ Event.CloseEvent ce3 = (Event.CloseEvent) batch.getEvents()[2];
Assert.assertTrue(ce3.getPath().equals("/file2"));
Assert.assertTrue(ce3.getTimestamp() > 0);
// DeleteOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
- Event.UnlinkEvent ue = (Event.UnlinkEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.UNLINK);
+ Event.UnlinkEvent ue = (Event.UnlinkEvent) batch.getEvents()[0];
Assert.assertTrue(ue.getPath().equals("/file2"));
Assert.assertTrue(ue.getTimestamp() > 0);
// MkdirOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Event.CreateEvent ce4 = (Event.CreateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Event.CreateEvent ce4 = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce4.getiNodeType() ==
Event.CreateEvent.INodeType.DIRECTORY);
Assert.assertTrue(ce4.getPath().equals("/dir"));
@@ -227,18 +254,22 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce4.getSymlinkTarget() == null);
// SetPermissionsOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue3.getPath().equals("/dir"));
Assert.assertTrue(mue3.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.PERMS);
Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-"));
// SetOwnerOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue4.getPath().equals("/dir"));
Assert.assertTrue(mue4.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.OWNER);
@@ -246,9 +277,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(mue4.getGroupName().equals("groupname"));
// SymlinkOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Event.CreateEvent ce5 = (Event.CreateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Event.CreateEvent ce5 = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce5.getiNodeType() ==
Event.CreateEvent.INodeType.SYMLINK);
Assert.assertTrue(ce5.getPath().equals("/dir2"));
@@ -257,9 +290,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir"));
// SetXAttrOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue5.getPath().equals("/file5"));
Assert.assertTrue(mue5.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.XATTRS);
@@ -268,9 +303,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(!mue5.isxAttrsRemoved());
// RemoveXAttrOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue6.getPath().equals("/file5"));
Assert.assertTrue(mue6.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.XATTRS);
@@ -279,9 +316,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(mue6.isxAttrsRemoved());
// SetAclOp (1)
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue7.getPath().equals("/file5"));
Assert.assertTrue(mue7.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.ACLS);
@@ -289,9 +328,11 @@ public class TestDFSInotifyEventInputStream {
AclEntry.parseAclEntry("user::rwx", true)));
// SetAclOp (2)
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue8.getPath().equals("/file5"));
Assert.assertTrue(mue8.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.ACLS);
@@ -305,7 +346,7 @@ public class TestDFSInotifyEventInputStream {
// and we should not have been behind at all when eventsBehind was set
// either, since there were few enough events that they should have all
// been read to the client during the first poll() call
- Assert.assertTrue(eis.getEventsBehindEstimate() == eventsBehind);
+ Assert.assertTrue(eis.getTxidsBehindEstimate() == eventsBehind);
} finally {
cluster.shutdown();
@@ -329,13 +370,14 @@ public class TestDFSInotifyEventInputStream {
}
cluster.getDfsCluster().shutdownNameNode(0);
cluster.getDfsCluster().transitionToActive(1);
- Event next = null;
+ EventBatch batch = null;
// we can read all of the edits logged by the old active from the new
// active
for (int i = 0; i < 10; i++) {
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
i));
}
Assert.assertTrue(eis.poll() == null);
@@ -369,11 +411,12 @@ public class TestDFSInotifyEventInputStream {
// make sure that the old active can't read any further than the edits
// it logged itself (it has no idea whether the in-progress edits from
// the other writer have actually been committed)
- Event next = null;
+ EventBatch batch = null;
for (int i = 0; i < 10; i++) {
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
i));
}
Assert.assertTrue(eis.poll() == null);
@@ -414,13 +457,13 @@ public class TestDFSInotifyEventInputStream {
}, 1, TimeUnit.SECONDS);
// a very generous wait period -- the edit will definitely have been
// processed by the time this is up
- Event next = eis.poll(5, TimeUnit.SECONDS);
- Assert.assertTrue(next != null);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir"));
+ EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(batch);
+ Assert.assertEquals(1, batch.getEvents().length);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
} finally {
cluster.shutdown();
}
}
-
}
[2/4] hadoop git commit: HDFS-8480. Fix performance and timeout
issues in HDFS-7929 by using hard-links to preserve old edit logs,
instead of copying them. (Zhe Zhang via Colin P. McCabe)
Posted by vi...@apache.org.
HDFS-8480. Fix performance and timeout issues in HDFS-7929 by using hard-links to preserve old edit logs, instead of copying them. (Zhe Zhang via Colin P. McCabe)
(cherry picked from commit 7b424f938c3c306795d574792b086d84e4f06425)
(cherry picked from commit cbd11681ce8a51d187d91748b67a708681e599de)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2e86d1f2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2e86d1f2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2e86d1f2
Branch: refs/heads/branch-2.6.1
Commit: 2e86d1f25469af34e92a1eebd1f84b13a0261ee4
Parents: 4363145
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Mon Jun 22 14:37:10 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue Sep 8 16:12:45 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 4 ++
.../hdfs/server/namenode/NNUpgradeUtil.java | 18 +------
.../org/apache/hadoop/hdfs/TestDFSUpgrade.java | 56 +++++++++++++++-----
3 files changed, 49 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e86d1f2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 1f6ce36..622fb16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -29,6 +29,10 @@ Release 2.6.1 - UNRELEASED
OPTIMIZATIONS
+ HDFS-8480. Fix performance and timeout issues in HDFS-7929 by using
+ hard-links to preserve old edit logs, instead of copying them. (Zhe Zhang
+ via Colin P. McCabe)
+
BUG FIXES
HDFS-7425. NameNode block deletion logging uses incorrect appender.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e86d1f2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java
index c01b11d..a4d9580 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -130,23 +131,8 @@ public abstract class NNUpgradeUtil {
for (String s : fileNameList) {
File prevFile = new File(tmpDir, s);
- Preconditions.checkState(prevFile.canRead(),
- "Edits log file " + s + " is not readable.");
File newFile = new File(curDir, prevFile.getName());
- Preconditions.checkState(newFile.createNewFile(),
- "Cannot create new edits log file in " + curDir);
- EditLogFileInputStream in = new EditLogFileInputStream(prevFile);
- EditLogFileOutputStream out =
- new EditLogFileOutputStream(conf, newFile, 512*1024);
- FSEditLogOp logOp = in.nextValidOp();
- while (logOp != null) {
- out.write(logOp);
- logOp = in.nextOp();
- }
- out.setReadyToFlush();
- out.flushAndSync(true);
- out.close();
- in.close();
+ Files.createLink(newFile.toPath(), prevFile.toPath());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e86d1f2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
index 49762ff..e5777c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
@@ -32,13 +32,17 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
+import java.nio.file.Files;
import java.util.List;
import java.util.regex.Pattern;
+import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -46,7 +50,11 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.namenode.TestParallelImageWrite;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
@@ -54,6 +62,8 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import static org.apache.hadoop.hdfs.inotify.Event.CreateEvent;
+
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
@@ -466,29 +476,49 @@ public class TestDFSUpgrade {
log("Normal NameNode upgrade", 1);
File[] created =
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
- List<String> beforeUpgrade = new LinkedList<String>();
for (final File createdDir : created) {
String[] fileNameList = createdDir.list(EditLogsFilter.INSTANCE);
- Collections.addAll(beforeUpgrade, fileNameList);
+ for (String fileName : fileNameList) {
+ String tmpFileName = fileName + ".tmp";
+ File existingFile = new File(createdDir, fileName);
+ File tmpFile = new File(createdDir, tmpFileName);
+ Files.move(existingFile.toPath(), tmpFile.toPath());
+ File newFile = new File(createdDir, fileName);
+ Preconditions.checkState(newFile.createNewFile(),
+ "Cannot create new edits log file in " + createdDir);
+ EditLogFileInputStream in = new EditLogFileInputStream(tmpFile,
+ HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID,
+ false);
+ EditLogFileOutputStream out = new EditLogFileOutputStream(conf, newFile,
+ (int)tmpFile.length());
+ out.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION + 1);
+ FSEditLogOp logOp = in.readOp();
+ while (logOp != null) {
+ out.write(logOp);
+ logOp = in.readOp();
+ }
+ out.setReadyToFlush();
+ out.flushAndSync(true);
+ out.close();
+ Files.delete(tmpFile.toPath());
+ }
}
cluster = createCluster();
- List<String> afterUpgrade = new LinkedList<String>();
- for (final File createdDir : created) {
- String[] fileNameList = createdDir.list(EditLogsFilter.INSTANCE);
- Collections.addAll(afterUpgrade, fileNameList);
- }
-
- for (String s : beforeUpgrade) {
- assertTrue(afterUpgrade.contains(s));
- }
-
+ DFSInotifyEventInputStream ieis =
+ cluster.getFileSystem().getInotifyEventStream(0);
+ EventBatch batch = ieis.poll();
+ Event[] events = batch.getEvents();
+ assertTrue("Should be able to get transactions before the upgrade.",
+ events.length > 0);
+ assertEquals(events[0].getEventType(), Event.EventType.CREATE);
+ assertEquals(((CreateEvent) events[0]).getPath(), "/TestUpgrade");
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
}
- private static enum EditLogsFilter implements FilenameFilter {
+ private enum EditLogsFilter implements FilenameFilter {
INSTANCE;
@Override
[3/4] hadoop git commit: HDFS-8846. Add a unit test for INotify
functionality across a layout version upgrade (Zhe Zhang via Colin P. McCabe)
Posted by vi...@apache.org.
HDFS-8846. Add a unit test for INotify functionality across a layout version upgrade (Zhe Zhang via Colin P. McCabe)
(cherry picked from commit a4d9acc51d1a977bc333da17780c00c72e8546f1)
(cherry picked from commit 9264b7e119efb70fb355904652beeb97e7ad90b9)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a976acc0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a976acc0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a976acc0
Branch: refs/heads/branch-2.6.1
Commit: a976acc02dadb8b29391f25a6bbabcbaf01449c8
Parents: 2e86d1f
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Tue Aug 25 14:09:13 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue Sep 8 16:18:30 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/TestDFSInotifyEventInputStream.java | 2 +-
.../org/apache/hadoop/hdfs/TestDFSUpgrade.java | 79 +-------------
.../hadoop/hdfs/TestDFSUpgradeFromImage.java | 107 ++++++++++++++++++-
.../src/test/resources/hadoop-252-dfs-dir.tgz | Bin 0 -> 14112 bytes
5 files changed, 109 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a976acc0/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 622fb16..287ffcb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -163,6 +163,9 @@ Release 2.6.1 - UNRELEASED
HDFS-7446. HDFS inotify should have the ability to determine what txid it
has read up to (cmccabe)
+ HDFS-8846. Add a unit test for INotify functionality across a layout
+ version upgrade (Zhe Zhang via Colin P. McCabe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a976acc0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
index 82db110..b386e45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
@@ -50,7 +50,7 @@ public class TestDFSInotifyEventInputStream {
private static final Log LOG = LogFactory.getLog(
TestDFSInotifyEventInputStream.class);
- private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
+ public static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
throws IOException, MissingEventsException {
EventBatch batch = null;
while ((batch = eis.poll()) == null);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a976acc0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
index e5777c7..ab2fc49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
@@ -28,21 +28,14 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
-import java.io.FilenameFilter;
import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.nio.file.Files;
-import java.util.List;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.inotify.Event;
-import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -50,11 +43,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.namenode.TestParallelImageWrite;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
@@ -62,8 +50,6 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import static org.apache.hadoop.hdfs.inotify.Event.CreateEvent;
-
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
@@ -81,7 +67,7 @@ public class TestDFSUpgrade {
private Configuration conf;
private int testCounter = 0;
private MiniDFSCluster cluster = null;
-
+
/**
* Writes an INFO log message containing the parameters.
*/
@@ -466,67 +452,6 @@ public class TestDFSUpgrade {
}
}
- @Test
- public void testPreserveEditLogs() throws Exception {
- conf = new HdfsConfiguration();
- conf = UpgradeUtilities.initializeStorageStateConf(1, conf);
- String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
- conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
-
- log("Normal NameNode upgrade", 1);
- File[] created =
- UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
- for (final File createdDir : created) {
- String[] fileNameList = createdDir.list(EditLogsFilter.INSTANCE);
- for (String fileName : fileNameList) {
- String tmpFileName = fileName + ".tmp";
- File existingFile = new File(createdDir, fileName);
- File tmpFile = new File(createdDir, tmpFileName);
- Files.move(existingFile.toPath(), tmpFile.toPath());
- File newFile = new File(createdDir, fileName);
- Preconditions.checkState(newFile.createNewFile(),
- "Cannot create new edits log file in " + createdDir);
- EditLogFileInputStream in = new EditLogFileInputStream(tmpFile,
- HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID,
- false);
- EditLogFileOutputStream out = new EditLogFileOutputStream(conf, newFile,
- (int)tmpFile.length());
- out.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION + 1);
- FSEditLogOp logOp = in.readOp();
- while (logOp != null) {
- out.write(logOp);
- logOp = in.readOp();
- }
- out.setReadyToFlush();
- out.flushAndSync(true);
- out.close();
- Files.delete(tmpFile.toPath());
- }
- }
-
- cluster = createCluster();
-
- DFSInotifyEventInputStream ieis =
- cluster.getFileSystem().getInotifyEventStream(0);
- EventBatch batch = ieis.poll();
- Event[] events = batch.getEvents();
- assertTrue("Should be able to get transactions before the upgrade.",
- events.length > 0);
- assertEquals(events[0].getEventType(), Event.EventType.CREATE);
- assertEquals(((CreateEvent) events[0]).getPath(), "/TestUpgrade");
- cluster.shutdown();
- UpgradeUtilities.createEmptyDirs(nameNodeDirs);
- }
-
- private enum EditLogsFilter implements FilenameFilter {
- INSTANCE;
-
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith(NNStorage.NameNodeFile.EDITS.getName());
- }
- }
-
public static void main(String[] args) throws Exception {
TestDFSUpgrade t = new TestDFSUpgrade();
TestDFSUpgrade.initialize();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a976acc0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
index ad907f6..ffbaa89 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
@@ -18,10 +18,6 @@
package org.apache.hadoop.hdfs;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
@@ -40,10 +36,13 @@ import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.test.GenericTestUtils;
@@ -51,6 +50,9 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Logger;
import org.junit.Test;
+import static org.apache.hadoop.hdfs.inotify.Event.CreateEvent;
+import static org.junit.Assert.*;
+
/**
* This tests data transfer protocol handling in the Datanode. It sends
* various forms of wrong data and verifies that Datanode handles it well.
@@ -74,6 +76,7 @@ public class TestDFSUpgradeFromImage {
private static final String HADOOP023_RESERVED_IMAGE =
"hadoop-0.23-reserved.tgz";
private static final String HADOOP2_RESERVED_IMAGE = "hadoop-2-reserved.tgz";
+ private static final String HADOOP252_IMAGE = "hadoop-252-dfs-dir.tgz";
private static class ReferenceFileInfo {
String path;
@@ -620,4 +623,100 @@ public class TestDFSUpgradeFromImage {
numDataNodes(1).enableManagedDfsDirsRedundancy(false).
manageDataDfsDirs(false), null);
}
+
+ @Test
+ public void testPreserveEditLogs() throws Exception {
+ unpackStorage(HADOOP252_IMAGE, HADOOP_DFS_DIR_TXT);
+ /**
+ * The pre-created image has the following edits:
+ * mkdir /input; mkdir /input/dir1~5
+ * copyFromLocal randome_file_1 /input/dir1
+ * copyFromLocal randome_file_2 /input/dir2
+ * mv /input/dir1/randome_file_1 /input/dir3/randome_file_3
+ * rmdir /input/dir1
+ */
+ Configuration conf = new HdfsConfiguration();
+ conf = UpgradeUtilities.initializeStorageStateConf(1, conf);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
+ .format(false)
+ .manageDataDfsDirs(false)
+ .manageNameDfsDirs(false)
+ .startupOption(StartupOption.UPGRADE)
+ .build();
+ DFSInotifyEventInputStream ieis =
+ cluster.getFileSystem().getInotifyEventStream(0);
+
+ EventBatch batch;
+ Event.CreateEvent ce;
+ Event.RenameEvent re;
+
+ // mkdir /input
+ batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
+ assertEquals(1, batch.getEvents().length);
+ assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ ce = (Event.CreateEvent) batch.getEvents()[0];
+ assertEquals(ce.getPath(), "/input");
+
+ // mkdir /input/dir1~5
+ for (int i = 1; i <= 5; i++) {
+ batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
+ assertEquals(1, batch.getEvents().length);
+ assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ ce = (Event.CreateEvent) batch.getEvents()[0];
+ assertEquals(ce.getPath(), "/input/dir" + i);
+ }
+ // copyFromLocal randome_file_1~2 /input/dir1~2
+ for (int i = 1; i <= 2; i++) {
+ batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
+ assertEquals(1, batch.getEvents().length);
+ if (batch.getEvents()[0].getEventType() != Event.EventType.CREATE) {
+ FSImage.LOG.debug("");
+ }
+ assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+
+ // copyFromLocal randome_file_1 /input/dir1, CLOSE
+ batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
+ assertEquals(1, batch.getEvents().length);
+ assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
+
+ // copyFromLocal randome_file_1 /input/dir1, CLOSE
+ batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
+ assertEquals(1, batch.getEvents().length);
+ assertTrue(batch.getEvents()[0].getEventType() ==
+ Event.EventType.RENAME);
+ re = (Event.RenameEvent) batch.getEvents()[0];
+ assertEquals(re.getDstPath(), "/input/dir" + i + "/randome_file_" + i);
+ }
+
+ // mv /input/dir1/randome_file_1 /input/dir3/randome_file_3
+ long txIDBeforeRename = batch.getTxid();
+ batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
+ assertEquals(1, batch.getEvents().length);
+ assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
+ re = (Event.RenameEvent) batch.getEvents()[0];
+ assertEquals(re.getDstPath(), "/input/dir3/randome_file_3");
+
+
+ // rmdir /input/dir1
+ batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
+ assertEquals(1, batch.getEvents().length);
+ assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.UNLINK);
+ assertEquals(((Event.UnlinkEvent) batch.getEvents()[0]).getPath(),
+ "/input/dir1");
+ long lastTxID = batch.getTxid();
+
+ // Start inotify from the tx before rename /input/dir1/randome_file_1
+ ieis = cluster.getFileSystem().getInotifyEventStream(txIDBeforeRename);
+ batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
+ assertEquals(1, batch.getEvents().length);
+ assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
+ re = (Event.RenameEvent) batch.getEvents()[0];
+ assertEquals(re.getDstPath(), "/input/dir3/randome_file_3");
+
+ // Try to read beyond available edits
+ ieis = cluster.getFileSystem().getInotifyEventStream(lastTxID + 1);
+ assertNull(ieis.poll());
+
+ cluster.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a976acc0/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-252-dfs-dir.tgz
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-252-dfs-dir.tgz b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-252-dfs-dir.tgz
new file mode 100644
index 0000000..4ad3e25
Binary files /dev/null and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-252-dfs-dir.tgz differ
[4/4] hadoop git commit: HDFS-8384. Allow NN to startup if there are
files having a lease but are not under construction. Contributed by Jing
Zhao.
Posted by vi...@apache.org.
HDFS-8384. Allow NN to startup if there are files having a lease but are not under construction. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/33537078
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/33537078
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/33537078
Branch: refs/heads/branch-2.6.1
Commit: 33537078a80822e64f25d62478732b01704b40c1
Parents: a976acc
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Tue Sep 8 16:45:38 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue Sep 8 16:45:38 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++
.../hadoop/hdfs/server/namenode/FSNamesystem.java | 5 ++++-
.../hadoop/hdfs/server/namenode/LeaseManager.java | 14 +++++++++++---
3 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/33537078/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 287ffcb..eaaea5d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -27,6 +27,9 @@ Release 2.6.1 - UNRELEASED
HDFS-8046. Allow better control of getContentSummary (kihwal)
+ HDFS-8384. Allow NN to startup if there are files having a lease but are not
+ under construction. (jing9)
+
OPTIMIZATIONS
HDFS-8480. Fix performance and timeout issues in HDFS-7929 by using
http://git-wip-us.apache.org/repos/asf/hadoop/blob/33537078/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 19edbb5..362b147 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -4693,7 +4693,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
assert hasWriteLock();
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
- Preconditions.checkArgument(uc != null);
+ if (uc == null) {
+ throw new IOException("Cannot finalize file " + src
+ + " because it is not under construction");
+ }
leaseManager.removeLease(uc.getClientName(), src);
pendingFile.recordModification(latestSnapshot);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/33537078/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
index e13a5c6..55ce0bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
@@ -116,7 +116,11 @@ public class LeaseManager {
final INodeFile cons;
try {
cons = this.fsnamesystem.getFSDirectory().getINode(path).asFile();
- Preconditions.checkState(cons.isUnderConstruction());
+ if (!cons.isUnderConstruction()) {
+ LOG.warn("The file " + cons.getFullPathName()
+ + " is not under construction but has lease.");
+ continue;
+ }
} catch (UnresolvedLinkException e) {
throw new AssertionError("Lease files should reside on this FS");
}
@@ -444,8 +448,12 @@ public class LeaseManager {
// verify that path exists in namespace
try {
INodeFile node = INodeFile.valueOf(fsnamesystem.dir.getINode(p), p);
- Preconditions.checkState(node.isUnderConstruction());
- inodes.put(p, node);
+ if (node.isUnderConstruction()) {
+ inodes.put(p, node);
+ } else {
+ LOG.warn("Ignore the lease of file " + p
+ + " for checkpoint since the file is not under construction");
+ }
} catch (IOException ioe) {
LOG.error(ioe);
}