You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/09/06 20:45:27 UTC
[02/17] hadoop git commit: HDFS-12383. Re-encryption updater should
handle canceled tasks better.
HDFS-12383. Re-encryption updater should handle canceled tasks better.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b2235b3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b2235b3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b2235b3
Branch: refs/heads/HDFS-7240
Commit: 8b2235b367298e494e58e43dc9fc7cdee1ae79ae
Parents: c0b19a9
Author: Xiao Chen <xi...@apache.org>
Authored: Sun Sep 3 19:50:21 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Wed Sep 6 12:14:47 2017 -0700
----------------------------------------------------------------------
.../server/namenode/ReencryptionUpdater.java | 15 ++-
.../hdfs/server/namenode/TestReencryption.java | 105 +++++++++++++++++--
2 files changed, 108 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b2235b3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
index 690a0e9..d641ea1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
@@ -39,6 +39,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -72,6 +73,7 @@ public final class ReencryptionUpdater implements Runnable {
private final StopWatch throttleTimerLocked = new StopWatch();
private volatile long faultRetryInterval = 60000;
+ private volatile boolean isRunning = false;
/**
* Class to track re-encryption submissions of a single zone. It contains
@@ -201,6 +203,11 @@ public final class ReencryptionUpdater implements Runnable {
pauseZoneId = zoneId;
}
+ @VisibleForTesting
+ boolean isRunning() {
+ return isRunning;
+ }
+
private final FSDirectory dir;
private final CompletionService<ReencryptionTask> batchService;
private final ReencryptionHandler handler;
@@ -242,6 +249,7 @@ public final class ReencryptionUpdater implements Runnable {
@Override
public void run() {
+ isRunning = true;
throttleTimerAll.start();
while (true) {
try {
@@ -250,11 +258,13 @@ public final class ReencryptionUpdater implements Runnable {
} catch (InterruptedException ie) {
LOG.warn("Re-encryption updater thread interrupted. Exiting.");
Thread.currentThread().interrupt();
+ isRunning = false;
return;
- } catch (IOException ioe) {
- LOG.warn("Re-encryption updater thread exception.", ioe);
+ } catch (IOException | CancellationException e) {
+ LOG.warn("Re-encryption updater thread exception.", e);
} catch (Throwable t) {
LOG.error("Re-encryption updater thread exiting.", t);
+ isRunning = false;
return;
}
}
@@ -405,6 +415,7 @@ public final class ReencryptionUpdater implements Runnable {
if (completed.isCancelled()) {
LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}",
task.zoneId, task.lastFile);
+ return;
}
boolean shouldRetry;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b2235b3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
index 7ba3f91..4b5be2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Supplier;
@@ -1086,12 +1087,9 @@ public class TestReencryption {
getEzManager().resumeReencryptForTesting();
Thread.sleep(3000);
- EncryptionZoneManager ezm = getEzManager();
- ReencryptionHandler handler = (ReencryptionHandler) Whitebox
- .getInternalState(ezm, "reencryptionHandler");
Map<Long, ZoneSubmissionTracker> tasks =
(Map<Long, ZoneSubmissionTracker>) Whitebox
- .getInternalState(handler, "submissions");
+ .getInternalState(getHandler(), "submissions");
List<Future> futures = new LinkedList<>();
for (ZoneSubmissionTracker zst : tasks.values()) {
for (Future f : zst.getTasks()) {
@@ -1494,6 +1492,88 @@ public class TestReencryption {
}
@Test
+ public void testCancelFuture() throws Exception {
+ final AtomicBoolean callableRunning = new AtomicBoolean(false);
+ class MyInjector extends EncryptionFaultInjector {
+ private volatile int exceptionCount = 0;
+
+ MyInjector(int numFailures) {
+ exceptionCount = numFailures;
+ }
+
+ @Override
+ public void reencryptEncryptedKeys() throws IOException {
+ if (exceptionCount > 0) {
+ exceptionCount--;
+ try {
+ callableRunning.set(true);
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException ie) {
+ LOG.info("Fault injector interrupted", ie);
+ }
+ }
+ }
+ }
+ final MyInjector injector = new MyInjector(1);
+ EncryptionFaultInjector.instance = injector;
+
+ /* Setup test dir:
+ * /zones/zone/[0-9]
+ * /dir/f
+ */
+ final int len = 8196;
+ final Path zoneParent = new Path("/zones");
+ final Path zone = new Path(zoneParent, "zone");
+ fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+ dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+ for (int i = 0; i < 10; ++i) {
+ DFSTestUtil
+ .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
+ 0xFEED);
+ }
+ final Path subdir = new Path("/dir");
+ fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
+ DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
+
+ // re-encrypt 10 files, so 2 callables. Hang 1, pause the updater so the
+ // callable is taken from the executor but not processed.
+ fsn.getProvider().rollNewVersion(TEST_KEY);
+ fsn.getProvider().flush();
+ getEzManager().pauseReencryptForTesting();
+ dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+ waitForQueuedZones(1);
+ getEzManager().resumeReencryptForTesting();
+
+ LOG.info("Waiting for re-encrypt callables to run");
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return callableRunning.get();
+ }
+ }, 100, 10000);
+
+ getEzManager().pauseReencryptUpdaterForTesting();
+ dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.CANCEL);
+
+ // now resume updater and verify status.
+ getEzManager().resumeReencryptUpdaterForTesting();
+ waitForZoneCompletes(zone.toString());
+
+ RemoteIterator<ZoneReencryptionStatus> it =
+ dfsAdmin.listReencryptionStatus();
+ assertTrue(it.hasNext());
+ final ZoneReencryptionStatus zs = it.next();
+ assertEquals(zone.toString(), zs.getZoneName());
+ assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
+ assertTrue(zs.isCanceled());
+ assertTrue(zs.getCompletionTime() > 0);
+ assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
+ assertEquals(0, zs.getFilesReencrypted());
+
+ assertTrue(getUpdater().isRunning());
+ }
+
+ @Test
public void testReencryptCancelForUpdater() throws Exception {
/* Setup test dir:
* /zones/zone/[0-9]
@@ -1822,12 +1902,7 @@ public class TestReencryption {
fsn.getProvider().rollNewVersion(TEST_KEY);
fsn.getProvider().flush();
- final EncryptionZoneManager ezm = getEzManager();
- final ReencryptionHandler handler = (ReencryptionHandler) Whitebox
- .getInternalState(ezm, "reencryptionHandler");
- final ReencryptionUpdater updater = (ReencryptionUpdater) Whitebox
- .getInternalState(handler, "reencryptionUpdater");
- Whitebox.setInternalState(updater, "faultRetryInterval", 50);
+ Whitebox.setInternalState(getUpdater(), "faultRetryInterval", 50);
dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
waitForReencryptedZones(1);
assertEquals(0, injector.exceptionCount);
@@ -1844,4 +1919,14 @@ public class TestReencryption {
assertEquals(10, zs.getFilesReencrypted());
assertEquals(0, zs.getNumReencryptionFailures());
}
+
+ private ReencryptionHandler getHandler() {
+ return (ReencryptionHandler) Whitebox
+ .getInternalState(getEzManager(), "reencryptionHandler");
+ }
+
+ private ReencryptionUpdater getUpdater() {
+ return (ReencryptionUpdater) Whitebox
+ .getInternalState(getHandler(), "reencryptionUpdater");
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org