You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2019/11/28 02:55:14 UTC
[hadoop] branch branch-2.10 updated: HDFS-14986.
ReplicaCachingGetSpaceUsed throws ConcurrentModificationException.
Contributed by Aiphago.
This is an automated email from the ASF dual-hosted git repository.
yqlin pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 1a83415 HDFS-14986. ReplicaCachingGetSpaceUsed throws ConcurrentModificationException. Contributed by Aiphago.
1a83415 is described below
commit 1a834157602069ed82e29e380e1d660a10592daa
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Thu Nov 28 10:43:35 2019 +0800
HDFS-14986. ReplicaCachingGetSpaceUsed throws ConcurrentModificationException. Contributed by Aiphago.
(cherry picked from commit 2b452b4e6063072b2bec491edd3f412eb7ac21f3)
---
.../org/apache/hadoop/fs/CachingGetSpaceUsed.java | 34 +++++++++++--
.../server/datanode/fsdataset/FsDatasetSpi.java | 6 +++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 12 ++---
.../fsdataset/impl/ReplicaCachingGetSpaceUsed.java | 1 +
.../impl/TestReplicaCachingGetSpaceUsed.java | 55 ++++++++++++++++++++++
5 files changed, 98 insertions(+), 10 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java
index 92476d7..58dc82d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java
@@ -47,6 +47,7 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
private final long jitter;
private final String dirPath;
private Thread refreshUsed;
+ private boolean shouldFirstRefresh;
/**
* This is the constructor used by the builder.
@@ -79,16 +80,30 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
this.refreshInterval = interval;
this.jitter = jitter;
this.used.set(initialUsed);
+ this.shouldFirstRefresh = true;
}
void init() {
if (used.get() < 0) {
used.set(0);
+ if (!shouldFirstRefresh) {
+ // Skip initial refresh operation, so we need to do first refresh
+ // operation immediately in refresh thread.
+ initRefeshThread(true);
+ return;
+ }
refresh();
}
+ initRefeshThread(false);
+ }
+ /**
+ * RunImmediately should set true, if we skip the first refresh.
+ * @param runImmediately The param default should be false.
+ */
+ private void initRefeshThread (boolean runImmediately) {
if (refreshInterval > 0) {
- refreshUsed = new Thread(new RefreshThread(this),
+ refreshUsed = new Thread(new RefreshThread(this, runImmediately),
"refreshUsed-" + dirPath);
refreshUsed.setDaemon(true);
refreshUsed.start();
@@ -101,6 +116,14 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
protected abstract void refresh();
/**
+ * Reset that if we need to do the first refresh.
+ * @param shouldFirstRefresh The flag value to set.
+ */
+ protected void setShouldFirstRefresh(boolean shouldFirstRefresh) {
+ this.shouldFirstRefresh = shouldFirstRefresh;
+ }
+
+ /**
* @return an estimate of space used in the directory path.
*/
@Override public long getUsed() throws IOException {
@@ -156,9 +179,11 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
private static final class RefreshThread implements Runnable {
final CachingGetSpaceUsed spaceUsed;
+ private boolean runImmediately;
- RefreshThread(CachingGetSpaceUsed spaceUsed) {
+ RefreshThread(CachingGetSpaceUsed spaceUsed, boolean runImmediately) {
this.spaceUsed = spaceUsed;
+ this.runImmediately = runImmediately;
}
@Override
@@ -176,7 +201,10 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
}
// Make sure that after the jitter we didn't end up at 0.
refreshInterval = Math.max(refreshInterval, 1);
- Thread.sleep(refreshInterval);
+ if (!runImmediately) {
+ Thread.sleep(refreshInterval);
+ }
+ runImmediately = false;
// update the used variable
spaceUsed.refresh();
} catch (InterruptedException e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 3f73e01..7953d45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -662,5 +662,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/
AutoCloseableLock acquireDatasetLock();
+ /**
+ * Deep copy the replica info belonging to given block pool.
+ * @param bpid Specified block pool id.
+ * @return A set of replica info.
+ * @throws IOException
+ */
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8d236d5..086e98d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -210,16 +210,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
- /**
- * The deepCopyReplica call doesn't use the datasetock since it will lead the
- * potential deadlock with the {@link FsVolumeList#addBlockPool} call.
- */
@Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
- Set<? extends Replica> replicas =
- new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
- : volumeMap.replicas(bpid));
+ Set<? extends Replica> replicas = null;
+ try (AutoCloseableLock lock = datasetLock.acquire()) {
+ replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
+ EMPTY_SET : volumeMap.replicas(bpid));
+ }
return Collections.unmodifiableSet(replicas);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java
index e887aa2..f765e5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java
@@ -59,6 +59,7 @@ public class ReplicaCachingGetSpaceUsed extends FSCachingGetSpaceUsed {
public ReplicaCachingGetSpaceUsed(Builder builder) throws IOException {
super(builder);
+ setShouldFirstRefresh(false);
volume = builder.getVolume();
bpid = builder.getBpid();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java
index 45a3916..6abf523 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import org.apache.commons.lang.math.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -27,8 +28,11 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -36,6 +40,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
+import java.util.Set;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.junit.Assert.assertEquals;
@@ -145,4 +150,54 @@ public class TestReplicaCachingGetSpaceUsed {
fs.delete(new Path("/testReplicaCachingGetSpaceUsedByRBWReplica"), true);
}
+
+ @Test(timeout = 15000)
+ public void testFsDatasetImplDeepCopyReplica() {
+ FsDatasetSpi<?> fsDataset = dataNode.getFSDataset();
+ ModifyThread modifyThread = new ModifyThread();
+ modifyThread.start();
+ String bpid = cluster.getNamesystem(0).getBlockPoolId();
+ int retryTimes = 10;
+
+ while (retryTimes > 0) {
+ try {
+ Set<? extends Replica> replicas = fsDataset.deepCopyReplica(bpid);
+ if (replicas.size() > 0) {
+ retryTimes--;
+ }
+ } catch (IOException e) {
+ modifyThread.setShouldRun(false);
+ Assert.fail("Encounter IOException when deep copy replica.");
+ }
+ }
+ modifyThread.setShouldRun(false);
+ }
+
+ private class ModifyThread extends Thread {
+ private boolean shouldRun = true;
+
+ @Override
+ public void run() {
+ FSDataOutputStream os = null;
+ while (shouldRun) {
+ try {
+ int id = RandomUtils.nextInt();
+ os = fs.create(new Path("/testFsDatasetImplDeepCopyReplica/" + id));
+ byte[] bytes = new byte[2048];
+ InputStream is = new ByteArrayInputStream(bytes);
+ IOUtils.copyBytes(is, os, bytes.length);
+ os.hsync();
+ os.close();
+ } catch (IOException e) {}
+ }
+
+ try {
+ fs.delete(new Path("/testFsDatasetImplDeepCopyReplica"), true);
+ } catch (IOException e) {}
+ }
+
+ private void setShouldRun(boolean shouldRun) {
+ this.shouldRun = shouldRun;
+ }
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org