You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2015/09/26 18:05:23 UTC
[16/50] [abbrv] hadoop git commit: HDFS-7446. HDFS inotify should
have the ability to determine what txid it has read up to (cmccabe) (cherry
picked from commit 75a326aaff8c92349701d9b3473c3070b8c2be44)
HDFS-7446. HDFS inotify should have the ability to determine what txid it has read up to (cmccabe)
(cherry picked from commit 75a326aaff8c92349701d9b3473c3070b8c2be44)
(cherry picked from commit 06552a15d5172a2b0ad3d61aa7f9a849857385aa)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/43631451
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/43631451
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/43631451
Branch: refs/heads/branch-2.6
Commit: 4363145128f91b2fb1f1c0254ee5e8621a1ac383
Parents: 8ed162b
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Tue Nov 25 17:44:34 2014 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue Sep 8 15:32:17 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/DFSInotifyEventInputStream.java | 65 ++--
.../apache/hadoop/hdfs/inotify/EventBatch.java | 41 +++
.../hadoop/hdfs/inotify/EventBatchList.java | 63 ++++
.../apache/hadoop/hdfs/inotify/EventsList.java | 63 ----
.../hadoop/hdfs/protocol/ClientProtocol.java | 8 +-
.../ClientNamenodeProtocolTranslatorPB.java | 4 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 341 ++++++++++---------
.../namenode/InotifyFSEditLogOpTranslator.java | 74 ++--
.../hdfs/server/namenode/NameNodeRpcServer.java | 45 +--
.../hadoop-hdfs/src/main/proto/inotify.proto | 10 +-
.../hdfs/TestDFSInotifyEventInputStream.java | 209 +++++++-----
12 files changed, 526 insertions(+), 400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d2f07c2..1f6ce36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -156,6 +156,9 @@ Release 2.6.1 - UNRELEASED
HDFS-7980. Incremental BlockReport will dramatically slow down namenode
startup. (Walter Su via szetszwo)
+ HDFS-7446. HDFS inotify should have the ability to determine what txid it
+ has read up to (cmccabe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
index 73c5f55..83b92b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
@@ -19,11 +19,10 @@
package org.apache.hadoop.hdfs;
import com.google.common.collect.Iterators;
-import com.google.common.util.concurrent.UncheckedExecutionException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.inotify.Event;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.util.Time;
@@ -33,13 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
/**
* Stream for reading inotify events. DFSInotifyEventInputStreams should not
@@ -52,7 +45,7 @@ public class DFSInotifyEventInputStream {
.class);
private final ClientProtocol namenode;
- private Iterator<Event> it;
+ private Iterator<EventBatch> it;
private long lastReadTxid;
/**
* The most recent txid the NameNode told us it has sync'ed -- helps us
@@ -78,22 +71,22 @@ public class DFSInotifyEventInputStream {
}
/**
- * Returns the next event in the stream or null if no new events are currently
- * available.
+ * Returns the next batch of events in the stream or null if no new
+ * batches are currently available.
*
* @throws IOException because of network error or edit log
* corruption. Also possible if JournalNodes are unresponsive in the
* QJM setting (even one unresponsive JournalNode is enough in rare cases),
* so catching this exception and retrying at least a few times is
* recommended.
- * @throws MissingEventsException if we cannot return the next event in the
- * stream because the data for the event (and possibly some subsequent events)
- * has been deleted (generally because this stream is a very large number of
- * events behind the current state of the NameNode). It is safe to continue
- * reading from the stream after this exception is thrown -- the next
- * available event will be returned.
+ * @throws MissingEventsException if we cannot return the next batch in the
+ * stream because the data for the events (and possibly some subsequent
+ * events) has been deleted (generally because this stream is a very large
+ * number of transactions behind the current state of the NameNode). It is
+ * safe to continue reading from the stream after this exception is thrown
+ * The next available batch of events will be returned.
*/
- public Event poll() throws IOException, MissingEventsException {
+ public EventBatch poll() throws IOException, MissingEventsException {
// need to keep retrying until the NN sends us the latest committed txid
if (lastReadTxid == -1) {
LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
@@ -101,14 +94,14 @@ public class DFSInotifyEventInputStream {
return null;
}
if (!it.hasNext()) {
- EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1);
+ EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
if (el.getLastTxid() != -1) {
// we only want to set syncTxid when we were actually able to read some
// edits on the NN -- otherwise it will seem like edits are being
// generated faster than we can read them when the problem is really
// that we are temporarily unable to read edits
syncTxid = el.getSyncTxid();
- it = el.getEvents().iterator();
+ it = el.getBatches().iterator();
long formerLastReadTxid = lastReadTxid;
lastReadTxid = el.getLastTxid();
if (el.getFirstTxid() != formerLastReadTxid + 1) {
@@ -131,18 +124,18 @@ public class DFSInotifyEventInputStream {
}
/**
- * Return a estimate of how many events behind the NameNode's current state
- * this stream is. Clients should periodically call this method and check if
- * its result is steadily increasing, which indicates that they are falling
- * behind (i.e. events are being generated faster than the client is reading
- * them). If a client falls too far behind events may be deleted before the
- * client can read them.
+ * Return a estimate of how many transaction IDs behind the NameNode's
+ * current state this stream is. Clients should periodically call this method
+ * and check if its result is steadily increasing, which indicates that they
+ * are falling behind (i.e. transaction are being generated faster than the
+ * client is reading them). If a client falls too far behind events may be
+ * deleted before the client can read them.
* <p/>
* A return value of -1 indicates that an estimate could not be produced, and
* should be ignored. The value returned by this method is really only useful
* when compared to previous or subsequent returned values.
*/
- public long getEventsBehindEstimate() {
+ public long getTxidsBehindEstimate() {
if (syncTxid == 0) {
return -1;
} else {
@@ -155,8 +148,8 @@ public class DFSInotifyEventInputStream {
}
/**
- * Returns the next event in the stream, waiting up to the specified amount of
- * time for a new event. Returns null if a new event is not available at the
+ * Returns the next event batch in the stream, waiting up to the specified
+ * amount of time for a new batch. Returns null if one is not available at the
* end of the specified amount of time. The time before the method returns may
* exceed the specified amount of time by up to the time required for an RPC
* to the NameNode.
@@ -168,12 +161,12 @@ public class DFSInotifyEventInputStream {
* see {@link DFSInotifyEventInputStream#poll()}
* @throws InterruptedException if the calling thread is interrupted
*/
- public Event poll(long time, TimeUnit tu) throws IOException,
+ public EventBatch poll(long time, TimeUnit tu) throws IOException,
InterruptedException, MissingEventsException {
long initialTime = Time.monotonicNow();
long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
long nextWait = INITIAL_WAIT_MS;
- Event next = null;
+ EventBatch next = null;
while ((next = poll()) == null) {
long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
if (timeLeft <= 0) {
@@ -193,17 +186,17 @@ public class DFSInotifyEventInputStream {
}
/**
- * Returns the next event in the stream, waiting indefinitely if a new event
- * is not immediately available.
+ * Returns the next batch of events in the stream, waiting indefinitely if
+ * a new batch is not immediately available.
*
* @throws IOException see {@link DFSInotifyEventInputStream#poll()}
* @throws MissingEventsException see
* {@link DFSInotifyEventInputStream#poll()}
* @throws InterruptedException if the calling thread is interrupted
*/
- public Event take() throws IOException, InterruptedException,
+ public EventBatch take() throws IOException, InterruptedException,
MissingEventsException {
- Event next = null;
+ EventBatch next = null;
int nextWaitMin = INITIAL_WAIT_MS;
while ((next = poll()) == null) {
// sleep for a random period between nextWaitMin and nextWaitMin * 2
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java
new file mode 100644
index 0000000..0ad1070
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.inotify;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A batch of events that all happened on the same transaction ID.
+ */
+@InterfaceAudience.Public
+public class EventBatch {
+ private final long txid;
+ private final Event[] events;
+
+ public EventBatch(long txid, Event[] events) {
+ this.txid = txid;
+ this.events = events;
+ }
+
+ public long getTxid() {
+ return txid;
+ }
+
+ public Event[] getEvents() { return events; }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java
new file mode 100644
index 0000000..9c97038
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.inotify;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.List;
+
+/**
+ * Contains a list of event batches, the transaction ID in the edit log up to
+ * which we read to produce these events, and the first txid we observed when
+ * producing these events (the last of which is for the purpose of determining
+ * whether we have missed events due to edit deletion). Also contains the most
+ * recent txid that the NameNode has sync'ed, so the client can determine how
+ * far behind in the edit log it is.
+ */
+@InterfaceAudience.Private
+public class EventBatchList {
+ private List<EventBatch> batches;
+ private long firstTxid;
+ private long lastTxid;
+ private long syncTxid;
+
+ public EventBatchList(List<EventBatch> batches, long firstTxid,
+ long lastTxid, long syncTxid) {
+ this.batches = batches;
+ this.firstTxid = firstTxid;
+ this.lastTxid = lastTxid;
+ this.syncTxid = syncTxid;
+ }
+
+ public List<EventBatch> getBatches() {
+ return batches;
+ }
+
+ public long getFirstTxid() {
+ return firstTxid;
+ }
+
+ public long getLastTxid() {
+ return lastTxid;
+ }
+
+ public long getSyncTxid() {
+ return syncTxid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java
deleted file mode 100644
index 6d02d3c..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.inotify;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.util.List;
-
-/**
- * Contains a set of events, the transaction ID in the edit log up to which we
- * read to produce these events, and the first txid we observed when producing
- * these events (the last of which is for the purpose of determining whether we
- * have missed events due to edit deletion). Also contains the most recent txid
- * that the NameNode has sync'ed, so the client can determine how far behind in
- * the edit log it is.
- */
-@InterfaceAudience.Private
-public class EventsList {
- private List<Event> events;
- private long firstTxid;
- private long lastTxid;
- private long syncTxid;
-
- public EventsList(List<Event> events, long firstTxid, long lastTxid,
- long syncTxid) {
- this.events = events;
- this.firstTxid = firstTxid;
- this.lastTxid = lastTxid;
- this.syncTxid = syncTxid;
- }
-
- public List<Event> getEvents() {
- return events;
- }
-
- public long getFirstTxid() {
- return firstTxid;
- }
-
- public long getLastTxid() {
- return lastTxid;
- }
-
- public long getSyncTxid() {
- return syncTxid;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index d29d2eb..f3a390a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -1405,9 +1405,9 @@ public interface ClientProtocol {
public long getCurrentEditLogTxid() throws IOException;
/**
- * Get an ordered list of events corresponding to the edit log transactions
- * from txid onwards.
+ * Get an ordered list of batches of events corresponding to the edit log
+ * transactions for txids equal to or greater than txid.
*/
@Idempotent
- public EventsList getEditsFromTxid(long txid) throws IOException;
+ public EventBatchList getEditsFromTxid(long txid) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 077a3e9..7a2dd15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -1480,7 +1480,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
- public EventsList getEditsFromTxid(long txid) throws IOException {
+ public EventBatchList getEditsFromTxid(long txid) throws IOException {
GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder()
.setTxid(txid).build();
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index ae9cb3e..c52588f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -46,11 +46,12 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.inotify.Event;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -2516,173 +2517,197 @@ public class PBHelper {
}
}
- public static EventsList convert(GetEditsFromTxidResponseProto resp) throws
+ public static EventBatchList convert(GetEditsFromTxidResponseProto resp) throws
IOException {
- List<Event> events = Lists.newArrayList();
- for (InotifyProtos.EventProto p : resp.getEventsList().getEventsList()) {
- switch(p.getType()) {
- case EVENT_CLOSE:
- InotifyProtos.CloseEventProto close =
- InotifyProtos.CloseEventProto.parseFrom(p.getContents());
- events.add(new Event.CloseEvent(close.getPath(), close.getFileSize(),
- close.getTimestamp()));
- break;
- case EVENT_CREATE:
- InotifyProtos.CreateEventProto create =
- InotifyProtos.CreateEventProto.parseFrom(p.getContents());
- events.add(new Event.CreateEvent.Builder()
- .iNodeType(createTypeConvert(create.getType()))
- .path(create.getPath())
- .ctime(create.getCtime())
- .ownerName(create.getOwnerName())
- .groupName(create.getGroupName())
- .perms(convert(create.getPerms()))
- .replication(create.getReplication())
- .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
- create.getSymlinkTarget())
- .overwrite(create.getOverwrite()).build());
- break;
- case EVENT_METADATA:
- InotifyProtos.MetadataUpdateEventProto meta =
- InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
- events.add(new Event.MetadataUpdateEvent.Builder()
- .path(meta.getPath())
- .metadataType(metadataUpdateTypeConvert(meta.getType()))
- .mtime(meta.getMtime())
- .atime(meta.getAtime())
- .replication(meta.getReplication())
- .ownerName(
- meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
- .groupName(
- meta.getGroupName().isEmpty() ? null : meta.getGroupName())
- .perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
- .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
- meta.getAclsList()))
- .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
- meta.getXAttrsList()))
- .xAttrsRemoved(meta.getXAttrsRemoved())
- .build());
- break;
- case EVENT_RENAME:
- InotifyProtos.RenameEventProto rename =
- InotifyProtos.RenameEventProto.parseFrom(p.getContents());
- events.add(new Event.RenameEvent(rename.getSrcPath(), rename.getDestPath(),
- rename.getTimestamp()));
- break;
- case EVENT_APPEND:
- InotifyProtos.AppendEventProto reopen =
- InotifyProtos.AppendEventProto.parseFrom(p.getContents());
- events.add(new Event.AppendEvent(reopen.getPath()));
- break;
- case EVENT_UNLINK:
- InotifyProtos.UnlinkEventProto unlink =
- InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
- events.add(new Event.UnlinkEvent(unlink.getPath(), unlink.getTimestamp()));
- break;
- default:
- throw new RuntimeException("Unexpected inotify event type: " +
- p.getType());
+ final InotifyProtos.EventsListProto list = resp.getEventsList();
+ final long firstTxid = list.getFirstTxid();
+ final long lastTxid = list.getLastTxid();
+
+ List<EventBatch> batches = Lists.newArrayList();
+ if (list.getEventsList().size() > 0) {
+ throw new IOException("Can't handle old inotify server response.");
+ }
+ for (InotifyProtos.EventBatchProto bp : list.getBatchList()) {
+ long txid = bp.getTxid();
+ if ((txid != -1) && ((txid < firstTxid) || (txid > lastTxid))) {
+ throw new IOException("Error converting TxidResponseProto: got a " +
+ "transaction id " + txid + " that was outside the range of [" +
+ firstTxid + ", " + lastTxid + "].");
+ }
+ List<Event> events = Lists.newArrayList();
+ for (InotifyProtos.EventProto p : bp.getEventsList()) {
+ switch (p.getType()) {
+ case EVENT_CLOSE:
+ InotifyProtos.CloseEventProto close =
+ InotifyProtos.CloseEventProto.parseFrom(p.getContents());
+ events.add(new Event.CloseEvent(close.getPath(),
+ close.getFileSize(), close.getTimestamp()));
+ break;
+ case EVENT_CREATE:
+ InotifyProtos.CreateEventProto create =
+ InotifyProtos.CreateEventProto.parseFrom(p.getContents());
+ events.add(new Event.CreateEvent.Builder()
+ .iNodeType(createTypeConvert(create.getType()))
+ .path(create.getPath())
+ .ctime(create.getCtime())
+ .ownerName(create.getOwnerName())
+ .groupName(create.getGroupName())
+ .perms(convert(create.getPerms()))
+ .replication(create.getReplication())
+ .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
+ create.getSymlinkTarget())
+ .overwrite(create.getOverwrite()).build());
+ break;
+ case EVENT_METADATA:
+ InotifyProtos.MetadataUpdateEventProto meta =
+ InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
+ events.add(new Event.MetadataUpdateEvent.Builder()
+ .path(meta.getPath())
+ .metadataType(metadataUpdateTypeConvert(meta.getType()))
+ .mtime(meta.getMtime())
+ .atime(meta.getAtime())
+ .replication(meta.getReplication())
+ .ownerName(
+ meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
+ .groupName(
+ meta.getGroupName().isEmpty() ? null : meta.getGroupName())
+ .perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
+ .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
+ meta.getAclsList()))
+ .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
+ meta.getXAttrsList()))
+ .xAttrsRemoved(meta.getXAttrsRemoved())
+ .build());
+ break;
+ case EVENT_RENAME:
+ InotifyProtos.RenameEventProto rename =
+ InotifyProtos.RenameEventProto.parseFrom(p.getContents());
+ events.add(new Event.RenameEvent(rename.getSrcPath(),
+ rename.getDestPath(), rename.getTimestamp()));
+ break;
+ case EVENT_APPEND:
+ InotifyProtos.AppendEventProto reopen =
+ InotifyProtos.AppendEventProto.parseFrom(p.getContents());
+ events.add(new Event.AppendEvent(reopen.getPath()));
+ break;
+ case EVENT_UNLINK:
+ InotifyProtos.UnlinkEventProto unlink =
+ InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
+ events.add(new Event.UnlinkEvent(unlink.getPath(),
+ unlink.getTimestamp()));
+ break;
+ default:
+ throw new RuntimeException("Unexpected inotify event type: " +
+ p.getType());
+ }
}
+ batches.add(new EventBatch(txid, events.toArray(new Event[0])));
}
- return new EventsList(events, resp.getEventsList().getFirstTxid(),
+ return new EventBatchList(batches, resp.getEventsList().getFirstTxid(),
resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid());
}
- public static GetEditsFromTxidResponseProto convertEditsResponse(EventsList el) {
+ public static GetEditsFromTxidResponseProto convertEditsResponse(EventBatchList el) {
InotifyProtos.EventsListProto.Builder builder =
InotifyProtos.EventsListProto.newBuilder();
- for (Event e : el.getEvents()) {
- switch(e.getEventType()) {
- case CLOSE:
- Event.CloseEvent ce = (Event.CloseEvent) e;
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_CLOSE)
- .setContents(
- InotifyProtos.CloseEventProto.newBuilder()
- .setPath(ce.getPath())
- .setFileSize(ce.getFileSize())
- .setTimestamp(ce.getTimestamp()).build().toByteString()
- ).build());
- break;
- case CREATE:
- Event.CreateEvent ce2 = (Event.CreateEvent) e;
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_CREATE)
- .setContents(
- InotifyProtos.CreateEventProto.newBuilder()
- .setType(createTypeConvert(ce2.getiNodeType()))
- .setPath(ce2.getPath())
- .setCtime(ce2.getCtime())
- .setOwnerName(ce2.getOwnerName())
- .setGroupName(ce2.getGroupName())
- .setPerms(convert(ce2.getPerms()))
- .setReplication(ce2.getReplication())
- .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
- "" : ce2.getSymlinkTarget())
- .setOverwrite(ce2.getOverwrite()).build().toByteString()
- ).build());
- break;
- case METADATA:
- Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
- InotifyProtos.MetadataUpdateEventProto.Builder metaB =
- InotifyProtos.MetadataUpdateEventProto.newBuilder()
- .setPath(me.getPath())
- .setType(metadataUpdateTypeConvert(me.getMetadataType()))
- .setMtime(me.getMtime())
- .setAtime(me.getAtime())
- .setReplication(me.getReplication())
- .setOwnerName(me.getOwnerName() == null ? "" :
- me.getOwnerName())
- .setGroupName(me.getGroupName() == null ? "" :
- me.getGroupName())
- .addAllAcls(me.getAcls() == null ?
- Lists.<AclEntryProto>newArrayList() :
- convertAclEntryProto(me.getAcls()))
- .addAllXAttrs(me.getxAttrs() == null ?
- Lists.<XAttrProto>newArrayList() :
- convertXAttrProto(me.getxAttrs()))
- .setXAttrsRemoved(me.isxAttrsRemoved());
- if (me.getPerms() != null) {
- metaB.setPerms(convert(me.getPerms()));
+ for (EventBatch b : el.getBatches()) {
+ List<InotifyProtos.EventProto> events = Lists.newArrayList();
+ for (Event e : b.getEvents()) {
+ switch (e.getEventType()) {
+ case CLOSE:
+ Event.CloseEvent ce = (Event.CloseEvent) e;
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_CLOSE)
+ .setContents(
+ InotifyProtos.CloseEventProto.newBuilder()
+ .setPath(ce.getPath())
+ .setFileSize(ce.getFileSize())
+ .setTimestamp(ce.getTimestamp()).build().toByteString()
+ ).build());
+ break;
+ case CREATE:
+ Event.CreateEvent ce2 = (Event.CreateEvent) e;
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_CREATE)
+ .setContents(
+ InotifyProtos.CreateEventProto.newBuilder()
+ .setType(createTypeConvert(ce2.getiNodeType()))
+ .setPath(ce2.getPath())
+ .setCtime(ce2.getCtime())
+ .setOwnerName(ce2.getOwnerName())
+ .setGroupName(ce2.getGroupName())
+ .setPerms(convert(ce2.getPerms()))
+ .setReplication(ce2.getReplication())
+ .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
+ "" : ce2.getSymlinkTarget())
+ .setOverwrite(ce2.getOverwrite()).build().toByteString()
+ ).build());
+ break;
+ case METADATA:
+ Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
+ InotifyProtos.MetadataUpdateEventProto.Builder metaB =
+ InotifyProtos.MetadataUpdateEventProto.newBuilder()
+ .setPath(me.getPath())
+ .setType(metadataUpdateTypeConvert(me.getMetadataType()))
+ .setMtime(me.getMtime())
+ .setAtime(me.getAtime())
+ .setReplication(me.getReplication())
+ .setOwnerName(me.getOwnerName() == null ? "" :
+ me.getOwnerName())
+ .setGroupName(me.getGroupName() == null ? "" :
+ me.getGroupName())
+ .addAllAcls(me.getAcls() == null ?
+ Lists.<AclEntryProto>newArrayList() :
+ convertAclEntryProto(me.getAcls()))
+ .addAllXAttrs(me.getxAttrs() == null ?
+ Lists.<XAttrProto>newArrayList() :
+ convertXAttrProto(me.getxAttrs()))
+ .setXAttrsRemoved(me.isxAttrsRemoved());
+ if (me.getPerms() != null) {
+ metaB.setPerms(convert(me.getPerms()));
+ }
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_METADATA)
+ .setContents(metaB.build().toByteString())
+ .build());
+ break;
+ case RENAME:
+ Event.RenameEvent re = (Event.RenameEvent) e;
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_RENAME)
+ .setContents(
+ InotifyProtos.RenameEventProto.newBuilder()
+ .setSrcPath(re.getSrcPath())
+ .setDestPath(re.getDstPath())
+ .setTimestamp(re.getTimestamp()).build().toByteString()
+ ).build());
+ break;
+ case APPEND:
+ Event.AppendEvent re2 = (Event.AppendEvent) e;
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_APPEND)
+ .setContents(
+ InotifyProtos.AppendEventProto.newBuilder()
+ .setPath(re2.getPath()).build().toByteString()
+ ).build());
+ break;
+ case UNLINK:
+ Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
+ events.add(InotifyProtos.EventProto.newBuilder()
+ .setType(InotifyProtos.EventType.EVENT_UNLINK)
+ .setContents(
+ InotifyProtos.UnlinkEventProto.newBuilder()
+ .setPath(ue.getPath())
+ .setTimestamp(ue.getTimestamp()).build().toByteString()
+ ).build());
+ break;
+ default:
+ throw new RuntimeException("Unexpected inotify event: " + e);
}
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_METADATA)
- .setContents(metaB.build().toByteString())
- .build());
- break;
- case RENAME:
- Event.RenameEvent re = (Event.RenameEvent) e;
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_RENAME)
- .setContents(
- InotifyProtos.RenameEventProto.newBuilder()
- .setSrcPath(re.getSrcPath())
- .setDestPath(re.getDstPath())
- .setTimestamp(re.getTimestamp()).build().toByteString()
- ).build());
- break;
- case APPEND:
- Event.AppendEvent re2 = (Event.AppendEvent) e;
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_APPEND)
- .setContents(
- InotifyProtos.AppendEventProto.newBuilder()
- .setPath(re2.getPath()).build().toByteString()
- ).build());
- break;
- case UNLINK:
- Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
- builder.addEvents(InotifyProtos.EventProto.newBuilder()
- .setType(InotifyProtos.EventType.EVENT_UNLINK)
- .setContents(
- InotifyProtos.UnlinkEventProto.newBuilder()
- .setPath(ue.getPath())
- .setTimestamp(ue.getTimestamp()).build().toByteString()
- ).build());
- break;
- default:
- throw new RuntimeException("Unexpected inotify event: " + e);
}
+ builder.addBatch(InotifyProtos.EventBatchProto.newBuilder().
+ setTxid(b.getTxid()).
+ addAllEvents(events));
}
builder.setFirstTxid(el.getFirstTxid());
builder.setLastTxid(el.getLastTxid());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
index 00a5f25..cd3fc23 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.Block;
import java.util.List;
@@ -39,32 +40,35 @@ public class InotifyFSEditLogOpTranslator {
return size;
}
- public static Event[] translate(FSEditLogOp op) {
+ public static EventBatch translate(FSEditLogOp op) {
switch(op.opCode) {
case OP_ADD:
FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op;
if (addOp.blocks.length == 0) { // create
- return new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
+ return new EventBatch(op.txid,
+ new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
.ctime(addOp.atime)
.replication(addOp.replication)
.ownerName(addOp.permissions.getUserName())
.groupName(addOp.permissions.getGroupName())
.perms(addOp.permissions.getPermission())
.overwrite(addOp.overwrite)
- .iNodeType(Event.CreateEvent.INodeType.FILE).build() };
+ .iNodeType(Event.CreateEvent.INodeType.FILE).build() });
} else {
- return new Event[] { new Event.AppendEvent(addOp.path) };
+ return new EventBatch(op.txid,
+ new Event[] { new Event.AppendEvent(addOp.path) });
}
case OP_CLOSE:
FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
- return new Event[] {
- new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) };
+ return new EventBatch(op.txid, new Event[] {
+ new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) });
case OP_SET_REPLICATION:
FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION)
.path(setRepOp.path)
- .replication(setRepOp.replication).build() };
+ .replication(setRepOp.replication).build() });
case OP_CONCAT_DELETE:
FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op;
List<Event> events = Lists.newArrayList();
@@ -73,73 +77,83 @@ public class InotifyFSEditLogOpTranslator {
events.add(new Event.UnlinkEvent(src, cdOp.timestamp));
}
events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp));
- return events.toArray(new Event[0]);
+ return new EventBatch(op.txid, events.toArray(new Event[0]));
case OP_RENAME_OLD:
FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op;
- return new Event[] {
- new Event.RenameEvent(rnOpOld.src, rnOpOld.dst, rnOpOld.timestamp) };
+ return new EventBatch(op.txid, new Event[] {
+ new Event.RenameEvent(rnOpOld.src,
+ rnOpOld.dst, rnOpOld.timestamp) });
case OP_RENAME:
FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op;
- return new Event[] {
- new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) };
+ return new EventBatch(op.txid, new Event[] {
+ new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) });
case OP_DELETE:
FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op;
- return new Event[] { new Event.UnlinkEvent(delOp.path, delOp.timestamp) };
+ return new EventBatch(op.txid, new Event[] {
+ new Event.UnlinkEvent(delOp.path, delOp.timestamp) });
case OP_MKDIR:
FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op;
- return new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
+ return new EventBatch(op.txid,
+ new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
.ctime(mkOp.timestamp)
.ownerName(mkOp.permissions.getUserName())
.groupName(mkOp.permissions.getGroupName())
.perms(mkOp.permissions.getPermission())
- .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() };
+ .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() });
case OP_SET_PERMISSIONS:
FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS)
.path(permOp.src)
- .perms(permOp.permissions).build() };
+ .perms(permOp.permissions).build() });
case OP_SET_OWNER:
FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER)
.path(ownOp.src)
- .ownerName(ownOp.username).groupName(ownOp.groupname).build() };
+ .ownerName(ownOp.username).groupName(ownOp.groupname).build() });
case OP_TIMES:
FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES)
.path(timesOp.path)
- .atime(timesOp.atime).mtime(timesOp.mtime).build() };
+ .atime(timesOp.atime).mtime(timesOp.mtime).build() });
case OP_SYMLINK:
FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op;
- return new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
+ return new EventBatch(op.txid,
+ new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
.ctime(symOp.atime)
.ownerName(symOp.permissionStatus.getUserName())
.groupName(symOp.permissionStatus.getGroupName())
.perms(symOp.permissionStatus.getPermission())
.symlinkTarget(symOp.value)
- .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() };
+ .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() });
case OP_REMOVE_XATTR:
FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
.path(rxOp.src)
.xAttrs(rxOp.xAttrs)
- .xAttrsRemoved(true).build() };
+ .xAttrsRemoved(true).build() });
case OP_SET_XATTR:
FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
.path(sxOp.src)
.xAttrs(sxOp.xAttrs)
- .xAttrsRemoved(false).build() };
+ .xAttrsRemoved(false).build() });
case OP_SET_ACL:
FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op;
- return new Event[] { new Event.MetadataUpdateEvent.Builder()
+ return new EventBatch(op.txid,
+ new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS)
.path(saOp.src)
- .acls(saOp.aclEntries).build() };
+ .acls(saOp.aclEntries).build() });
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 690d7e1..16cec5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -34,7 +34,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
@@ -55,8 +54,8 @@ import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
@@ -67,8 +66,8 @@ import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
-import org.apache.hadoop.hdfs.inotify.Event;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -139,10 +138,16 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.WritableRpcEngine;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
+import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
@@ -155,19 +160,12 @@ import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolP
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
-import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
-import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
-import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.SpanReceiverInfo;
-import org.apache.hadoop.tracing.TraceAdminPB;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
@@ -175,6 +173,7 @@ import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
/**
@@ -1670,7 +1669,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
- public EventsList getEditsFromTxid(long txid) throws IOException {
+ public EventBatchList getEditsFromTxid(long txid) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.READ); // only active
namesystem.checkSuperuserPrivilege();
@@ -1689,13 +1688,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
// guaranteed to have been written by this NameNode.)
boolean readInProgress = syncTxid > 0;
- List<Event> events = Lists.newArrayList();
+ List<EventBatch> batches = Lists.newArrayList();
+ int totalEvents = 0;
long maxSeenTxid = -1;
long firstSeenTxid = -1;
if (syncTxid > 0 && txid > syncTxid) {
// we can't read past syncTxid, so there's no point in going any further
- return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+ return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
Collection<EditLogInputStream> streams = null;
@@ -1707,7 +1707,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
// will result
LOG.info("NN is transitioning from active to standby and FSEditLog " +
"is closed -- could not read edits");
- return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+ return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
boolean breakOuter = false;
@@ -1725,9 +1725,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
break;
}
- Event[] eventsFromOp = InotifyFSEditLogOpTranslator.translate(op);
- if (eventsFromOp != null) {
- events.addAll(Arrays.asList(eventsFromOp));
+ EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op);
+ if (eventBatch != null) {
+ batches.add(eventBatch);
+ totalEvents += eventBatch.getEvents().length;
}
if (op.getTransactionId() > maxSeenTxid) {
maxSeenTxid = op.getTransactionId();
@@ -1735,7 +1736,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
if (firstSeenTxid == -1) {
firstSeenTxid = op.getTransactionId();
}
- if (events.size() >= maxEventsPerRPC || (syncTxid > 0 &&
+ if (totalEvents >= maxEventsPerRPC || (syncTxid > 0 &&
op.getTransactionId() == syncTxid)) {
// we're done
breakOuter = true;
@@ -1750,7 +1751,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
}
- return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+ return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
@Override // TraceAdminProtocol
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
index a1d4d92..e51c02c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
@@ -48,6 +48,11 @@ message EventProto {
required bytes contents = 2;
}
+message EventBatchProto {
+ required int64 txid = 1;
+ repeated EventProto events = 2;
+}
+
enum INodeType {
I_TYPE_FILE = 0x0;
I_TYPE_DIRECTORY = 0x1;
@@ -111,8 +116,9 @@ message UnlinkEventProto {
}
message EventsListProto {
- repeated EventProto events = 1;
+ repeated EventProto events = 1; // deprecated
required int64 firstTxid = 2;
required int64 lastTxid = 3;
required int64 syncTxid = 4;
-}
\ No newline at end of file
+ repeated EventBatchProto batch = 5;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
index a608ba8..82db110 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
@@ -49,11 +50,17 @@ public class TestDFSInotifyEventInputStream {
private static final Log LOG = LogFactory.getLog(
TestDFSInotifyEventInputStream.class);
- private static Event waitForNextEvent(DFSInotifyEventInputStream eis)
+ private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
throws IOException, MissingEventsException {
- Event next = null;
- while ((next = eis.poll()) == null);
- return next;
+ EventBatch batch = null;
+ while ((batch = eis.poll()) == null);
+ return batch;
+ }
+
+ private static long checkTxid(EventBatch batch, long prevTxid){
+ Assert.assertTrue("Previous txid " + prevTxid + " was not less than " +
+ "new txid " + batch.getTxid(), prevTxid < batch.getTxid());
+ return batch.getTxid();
}
/**
@@ -64,7 +71,7 @@ public class TestDFSInotifyEventInputStream {
*/
@Test
public void testOpcodeCount() {
- Assert.assertTrue(FSEditLogOpCodes.values().length == 47);
+ Assert.assertEquals(47, FSEditLogOpCodes.values().length);
}
@@ -127,30 +134,36 @@ public class TestDFSInotifyEventInputStream {
"user::rwx,user:foo:rw-,group::r--,other::---", true));
client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent
- Event next = null;
+ EventBatch batch = null;
// RenameOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
- Event.RenameEvent re = (Event.RenameEvent) next;
- Assert.assertTrue(re.getDstPath().equals("/file4"));
- Assert.assertTrue(re.getSrcPath().equals("/file"));
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ long txid = batch.getTxid();
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
+ Event.RenameEvent re = (Event.RenameEvent) batch.getEvents()[0];
+ Assert.assertEquals("/file4", re.getDstPath());
+ Assert.assertEquals("/file", re.getSrcPath());
Assert.assertTrue(re.getTimestamp() > 0);
- long eventsBehind = eis.getEventsBehindEstimate();
+ long eventsBehind = eis.getTxidsBehindEstimate();
// RenameOldOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
- Event.RenameEvent re2 = (Event.RenameEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
+ Event.RenameEvent re2 = (Event.RenameEvent) batch.getEvents()[0];
Assert.assertTrue(re2.getDstPath().equals("/file2"));
Assert.assertTrue(re2.getSrcPath().equals("/file4"));
Assert.assertTrue(re.getTimestamp() > 0);
// AddOp with overwrite
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Event.CreateEvent ce = (Event.CreateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Event.CreateEvent ce = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
Assert.assertTrue(ce.getPath().equals("/file2"));
Assert.assertTrue(ce.getCtime() > 0);
@@ -159,66 +172,80 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce.getOverwrite());
// CloseOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
- Event.CloseEvent ce2 = (Event.CloseEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
+ Event.CloseEvent ce2 = (Event.CloseEvent) batch.getEvents()[0];
Assert.assertTrue(ce2.getPath().equals("/file2"));
Assert.assertTrue(ce2.getFileSize() > 0);
Assert.assertTrue(ce2.getTimestamp() > 0);
// AddOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
- Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
+ Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2"));
// CloseOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
- Assert.assertTrue(((Event.CloseEvent) next).getPath().equals("/file2"));
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
+ Assert.assertTrue(((Event.CloseEvent) batch.getEvents()[0]).getPath().equals("/file2"));
// TimesOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue.getPath().equals("/file2"));
Assert.assertTrue(mue.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.TIMES);
// SetReplicationOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue2.getPath().equals("/file2"));
Assert.assertTrue(mue2.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.REPLICATION);
Assert.assertTrue(mue2.getReplication() == 1);
// ConcatDeleteOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
- Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
- Event.UnlinkEvent ue2 = (Event.UnlinkEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(3, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
+ Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2"));
+ Assert.assertTrue(batch.getEvents()[1].getEventType() == Event.EventType.UNLINK);
+ Event.UnlinkEvent ue2 = (Event.UnlinkEvent) batch.getEvents()[1];
Assert.assertTrue(ue2.getPath().equals("/file3"));
Assert.assertTrue(ue2.getTimestamp() > 0);
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
- Event.CloseEvent ce3 = (Event.CloseEvent) next;
+ Assert.assertTrue(batch.getEvents()[2].getEventType() == Event.EventType.CLOSE);
+ Event.CloseEvent ce3 = (Event.CloseEvent) batch.getEvents()[2];
Assert.assertTrue(ce3.getPath().equals("/file2"));
Assert.assertTrue(ce3.getTimestamp() > 0);
// DeleteOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
- Event.UnlinkEvent ue = (Event.UnlinkEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.UNLINK);
+ Event.UnlinkEvent ue = (Event.UnlinkEvent) batch.getEvents()[0];
Assert.assertTrue(ue.getPath().equals("/file2"));
Assert.assertTrue(ue.getTimestamp() > 0);
// MkdirOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Event.CreateEvent ce4 = (Event.CreateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Event.CreateEvent ce4 = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce4.getiNodeType() ==
Event.CreateEvent.INodeType.DIRECTORY);
Assert.assertTrue(ce4.getPath().equals("/dir"));
@@ -227,18 +254,22 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce4.getSymlinkTarget() == null);
// SetPermissionsOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue3.getPath().equals("/dir"));
Assert.assertTrue(mue3.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.PERMS);
Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-"));
// SetOwnerOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue4.getPath().equals("/dir"));
Assert.assertTrue(mue4.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.OWNER);
@@ -246,9 +277,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(mue4.getGroupName().equals("groupname"));
// SymlinkOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Event.CreateEvent ce5 = (Event.CreateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Event.CreateEvent ce5 = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce5.getiNodeType() ==
Event.CreateEvent.INodeType.SYMLINK);
Assert.assertTrue(ce5.getPath().equals("/dir2"));
@@ -257,9 +290,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir"));
// SetXAttrOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue5.getPath().equals("/file5"));
Assert.assertTrue(mue5.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.XATTRS);
@@ -268,9 +303,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(!mue5.isxAttrsRemoved());
// RemoveXAttrOp
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue6.getPath().equals("/file5"));
Assert.assertTrue(mue6.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.XATTRS);
@@ -279,9 +316,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(mue6.isxAttrsRemoved());
// SetAclOp (1)
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue7.getPath().equals("/file5"));
Assert.assertTrue(mue7.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.ACLS);
@@ -289,9 +328,11 @@ public class TestDFSInotifyEventInputStream {
AclEntry.parseAclEntry("user::rwx", true)));
// SetAclOp (2)
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
- Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) next;
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ txid = checkTxid(batch, txid);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+ Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue8.getPath().equals("/file5"));
Assert.assertTrue(mue8.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.ACLS);
@@ -305,7 +346,7 @@ public class TestDFSInotifyEventInputStream {
// and we should not have been behind at all when eventsBehind was set
// either, since there were few enough events that they should have all
// been read to the client during the first poll() call
- Assert.assertTrue(eis.getEventsBehindEstimate() == eventsBehind);
+ Assert.assertTrue(eis.getTxidsBehindEstimate() == eventsBehind);
} finally {
cluster.shutdown();
@@ -329,13 +370,14 @@ public class TestDFSInotifyEventInputStream {
}
cluster.getDfsCluster().shutdownNameNode(0);
cluster.getDfsCluster().transitionToActive(1);
- Event next = null;
+ EventBatch batch = null;
// we can read all of the edits logged by the old active from the new
// active
for (int i = 0; i < 10; i++) {
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
i));
}
Assert.assertTrue(eis.poll() == null);
@@ -369,11 +411,12 @@ public class TestDFSInotifyEventInputStream {
// make sure that the old active can't read any further than the edits
// it logged itself (it has no idea whether the in-progress edits from
// the other writer have actually been committed)
- Event next = null;
+ EventBatch batch = null;
for (int i = 0; i < 10; i++) {
- next = waitForNextEvent(eis);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
+ batch = waitForNextEvents(eis);
+ Assert.assertEquals(1, batch.getEvents().length);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
i));
}
Assert.assertTrue(eis.poll() == null);
@@ -414,13 +457,13 @@ public class TestDFSInotifyEventInputStream {
}, 1, TimeUnit.SECONDS);
// a very generous wait period -- the edit will definitely have been
// processed by the time this is up
- Event next = eis.poll(5, TimeUnit.SECONDS);
- Assert.assertTrue(next != null);
- Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
- Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir"));
+ EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(batch);
+ Assert.assertEquals(1, batch.getEvents().length);
+ Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+ Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
} finally {
cluster.shutdown();
}
}
-
}