You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/09/19 22:51:00 UTC

tez git commit: TEZ-3434. Add unit tests for flushing of recovery events. (Harish Jaiprakash via hitesh)

Repository: tez
Updated Branches:
  refs/heads/master b17edc401 -> 9cf25d142


TEZ-3434. Add unit tests for flushing of recovery events. (Harish Jaiprakash via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9cf25d14
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9cf25d14
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9cf25d14

Branch: refs/heads/master
Commit: 9cf25d142b4c30c7d903c1ae03bca7b070b706b0
Parents: b17edc4
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Sep 19 15:50:22 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Sep 19 15:50:22 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../dag/history/recovery/RecoveryService.java   |   3 +-
 .../history/recovery/TestRecoveryService.java   | 390 +++++++++++++++----
 3 files changed, 325 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9cf25d14/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ed0ef7b..c7f540b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3434. Add unit tests for flushing of recovery events.
   TEZ-3317. Speculative execution starts too early due to 0 progress.
   TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM.
   TEZ-3272. Add AMContainerImpl and AMNodeImpl to StateMachine visualization list.

http://git-wip-us.apache.org/repos/asf/tez/blob/9cf25d14/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 3eeddf5..8c29172 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -69,7 +69,8 @@ public class RecoveryService extends AbstractService {
   @VisibleForTesting
   public static final boolean TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT = true;
 
-  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+  @VisibleForTesting
+  LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
       new LinkedBlockingQueue<DAGHistoryEvent>();
   private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
   private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();

http://git-wip-us.apache.org/repos/asf/tez/blob/9cf25d14/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
index d828d6b..3dec1d7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
@@ -17,90 +17,140 @@
  */
 package org.apache.tez.dag.history.recovery;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
-import org.junit.Before;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
 public class TestRecoveryService {
 
-  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+  private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR
       + TestRecoveryService.class.getName() + "-tmpDir";
 
+  private static final long startTime = System.currentTimeMillis();
+  private static final ApplicationId appId = ApplicationId.newInstance(startTime, 1);
+  private static final ApplicationAttemptId appAttemptId =
+      ApplicationAttemptId.newInstance(appId, 1);
+  private static final TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+  private static final TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
+  private static final TezTaskID tezTaskId = TezTaskID.getInstance(vertexId, 1);
+
   private Configuration conf;
+  private AppContext appContext;
+  private MockRecoveryService recoveryService;
+  private Path dagRecoveryPath;
+  private Path summaryPath;
+  private FileSystem fs;
+  private FSDataOutputStream dagFos;
+  private FSDataOutputStream summaryFos;
 
-  @Before
-  public void setUp() throws IllegalArgumentException, IOException {
-    this.conf = new Configuration();
-    FileSystem localFS = FileSystem.getLocal(conf);
-    localFS.delete(new Path(TEST_ROOT_DIR), true);
-  }
+  private void setup(boolean useMockFs, String[][] configs) throws Exception {
+    conf = new Configuration();
+    if (configs != null) {
+      for (String[] config : configs) {
+        conf.set(config[0], config[1]);
+      }
+    }
 
-  @Test(timeout = 5000)
-  public void testDrainEvents() throws IOException {
-    Configuration conf = new Configuration();
-    AppContext appContext = mock(AppContext.class);
-    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
+    appContext = mock(AppContext.class);
     when(appContext.getClock()).thenReturn(new SystemClock());
     when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
-    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
     when(appContext.getApplicationID()).thenReturn(appId);
 
-    MockRecoveryService recoveryService = new MockRecoveryService(appContext);
+    if (useMockFs) {
+      fs = mock(FileSystem.class);
+      when(appContext.getCurrentRecoveryDir()).thenReturn(new Path("mockfs:///"));
+      conf.set("fs.mockfs.impl", MockFileSystem.class.getName());
+      MockFileSystem.delegate = fs;
+      dagFos = spy(new FSDataOutputStream(new OutputStream() {
+        @Override
+        public void write(int b) throws IOException {}
+      }, null));
+      summaryFos = spy(new FSDataOutputStream(new OutputStream() {
+        @Override
+        public void write(int b) throws IOException {}
+      }, null));
+    } else {
+      when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
+      fs = FileSystem.getLocal(conf);
+      fs.delete(new Path(TEST_ROOT_DIR), true);
+    }
+
+    recoveryService = new MockRecoveryService(appContext);
     conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
     recoveryService.init(conf);
+
+    summaryPath = TezCommonUtils.getSummaryRecoveryPath(recoveryService.recoveryPath);
+    dagRecoveryPath = TezCommonUtils.getDAGRecoveryPath(
+        recoveryService.recoveryPath, dagId.toString());
+    if (useMockFs) {
+      when(fs.create(eq(dagRecoveryPath), eq(false), anyInt())).thenReturn(dagFos);
+      when(fs.create(eq(summaryPath), eq(false), anyInt())).thenReturn(summaryFos);
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testDrainEvents() throws Exception {
+    setup(false, null);
     recoveryService.start();
-    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1);
     int randEventCount = new Random().nextInt(100) + 100;
     for (int i=0; i< randEventCount; ++i) {
       recoveryService.handle(new DAGHistoryEvent(dagId,
-          new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L)));
+          new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
     }
     recoveryService.stop();
     assertEquals(randEventCount, recoveryService.processedRecoveryEventCounter.get());
   }
 
   @Test(timeout = 5000)
-  public void testMultipleDAGFinishedEvent() throws IOException {
-    Configuration conf = new Configuration();
-    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-    AppContext appContext = mock(AppContext.class);
-    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
-    when(appContext.getClock()).thenReturn(new SystemClock());
-    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
-    when(appContext.getApplicationID()).thenReturn(appId);
-
-    MockRecoveryService recoveryService = new MockRecoveryService(appContext);
-    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
-    recoveryService.init(conf);
+  public void testMultipleDAGFinishedEvent() throws Exception {
+    setup(false, null);
     recoveryService.start();
-    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1);
     int randEventCount = new Random().nextInt(100) + 100;
     for (int i=0; i< randEventCount; ++i) {
       recoveryService.handle(new DAGHistoryEvent(dagId,
-          new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L)));
+          new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
     }
     recoveryService.await();
     assertTrue(recoveryService.outputStreamMap.containsKey(dagId));
@@ -120,23 +170,10 @@ public class TestRecoveryService {
   }
 
   @Test(timeout = 5000)
-  public void testSummaryPathExisted() throws IOException {
-    Configuration conf = new Configuration();
-    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-    AppContext appContext = mock(AppContext.class);
-    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
-    when(appContext.getClock()).thenReturn(new SystemClock());
-    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
-    when(appContext.getApplicationID()).thenReturn(appId);
-
-    MockRecoveryService recoveryService = new MockRecoveryService(appContext);
-    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
-    recoveryService.init(conf);
+  public void testSummaryPathExisted() throws Exception {
+    setup(false, null);
     recoveryService.start();
-    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1);
-    Path dagRecoveryPath = TezCommonUtils.getSummaryRecoveryPath(recoveryService.recoveryPath);
-    touchFile(dagRecoveryPath);
+    touchFile(summaryPath);
     assertFalse(recoveryService.hasRecoveryFailed());
     recoveryService.handle(new DAGHistoryEvent(dagId,
         new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null,
@@ -146,38 +183,183 @@ public class TestRecoveryService {
     recoveryService.handle(new DAGHistoryEvent(dagId,
         new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null,
             appAttemptId, null)));
+    recoveryService.stop();
   }
 
   @Test(timeout = 5000)
-  public void testRecoveryPathExisted() throws IOException {
-    Configuration conf = new Configuration();
-    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    AppContext appContext = mock(AppContext.class);
-    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
-    when(appContext.getClock()).thenReturn(new SystemClock());
-    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
-    when(appContext.getApplicationID()).thenReturn(appId);
-
-    MockRecoveryService recoveryService = new MockRecoveryService(appContext);
-    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
-    recoveryService.init(conf);
+  public void testRecoveryPathExisted() throws Exception {
+    setup(false, null);
     recoveryService.start();
-    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1);
-    Path dagRecoveryPath = TezCommonUtils.getDAGRecoveryPath(recoveryService.recoveryPath, dagId.toString());
     touchFile(dagRecoveryPath);
     assertFalse(recoveryService.hasRecoveryFailed());
     recoveryService.handle(new DAGHistoryEvent(dagId,
-        new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L)));
+        new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
     // wait for recovery event to be handled
     recoveryService.await();
     assertTrue(recoveryService.hasRecoveryFailed());
     // be able to handle recovery event after fatal error
     recoveryService.handle(new DAGHistoryEvent(dagId,
-        new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L)));
+        new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
+    recoveryService.stop();
+  }
+
+  @Test(timeout=5000)
+  public void testRecoveryFlushOnMaxEvents() throws Exception {
+    setup(true, new String[][] {
+        {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "10"},
+        {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "-1"}
+      });
+    recoveryService.start();
+
+    // Send 1 event less, wait for drain
+    for (int i = 0; i < 9; ++i) {
+      recoveryService.handle(new DAGHistoryEvent(dagId,
+          new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+    }
+    waitForDrain(-1);
+    verify(dagFos, times(0)).hflush();
+
+    // This event should cause the flush.
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+    waitForDrain(-1);
+    verify(dagFos, times(1)).hflush();
+
+    recoveryService.stop();
+  }
+
+  @Test(timeout=10000)
+  public void testRecoveryFlushOnTimeoutEvents() throws Exception {
+    setup(true, new String[][] {
+      {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "-1"},
+      {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "5"}
+    });
+    recoveryService.start();
+
+    // Send lot of events.
+    for (int i = 0; i < TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT; ++i) {
+      recoveryService.handle(new DAGHistoryEvent(dagId,
+          new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+    }
+    // wait for timeout.
+    Thread.sleep(5000);
+    assertTrue(recoveryService.eventQueue.isEmpty());
+    verify(fs, times(1)).create(eq(dagRecoveryPath), eq(false), anyInt());
+    verify(dagFos, times(0)).hflush();
+
+    // The flush is trigged by sending 1 event after the timeout.
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+    waitForDrain(1000);
+    verify(dagFos, times(1)).hflush();
+
+    recoveryService.stop();
+  }
+
+  @Test(timeout=10000)
+  public void testRecoveryFlush() throws Exception {
+    setup(true, new String[][] {
+      {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "10"},
+      {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "5"}
+    });
+    recoveryService.start();
+
+    // 5 second flush
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+    Thread.sleep(5000);
+    assertTrue(recoveryService.eventQueue.isEmpty());
+    verify(fs, times(1)).create(eq(dagRecoveryPath), eq(false), anyInt());
+    verify(dagFos, times(0)).hflush();
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+    waitForDrain(1000);
+    verify(dagFos, times(1)).hflush();
+
+    // Number of events flush.
+    for (int i = 0; i < 9; ++i) {
+      recoveryService.handle(new DAGHistoryEvent(dagId,
+          new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+    }
+    waitForDrain(-1);
+    verify(dagFos, times(1)).hflush();
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+    waitForDrain(-1);
+    verify(dagFos, times(2)).hflush();
+
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+
+    recoveryService.stop();
+  }
+
+  @Test(timeout=50000)
+  public void testRecoveryFlushOnStop() throws Exception {
+    setup(true, new String[][] {
+      {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "-1"},
+      {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "-1"}
+    });
+    recoveryService.start();
+
+    // Does not flush on event counts.
+    for (int i = 0; i < TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT; ++i) {
+      recoveryService.handle(new DAGHistoryEvent(dagId,
+          new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+    }
+    waitForDrain(-1);
+    verify(dagFos, times(0)).hflush();
+
+    // Does not flush on timeout.
+    Thread.sleep(TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT * 1000);
+    recoveryService.handle(new DAGHistoryEvent(dagId,
+        new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+    waitForDrain(-1);
+    verify(dagFos, times(0)).hflush();
+
+    // Does flush on stop.
+    recoveryService.stop();
+    verify(dagFos, times(1)).hflush();
+  }
+
+  @Test(timeout=5000)
+  public void testRecoveryFlushOnSummaryEvent() throws Exception {
+    setup(true, new String[][] {
+      {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "-1"},
+      {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "-1"}
+    });
+    recoveryService.start();
+
+    DAGPlan dagPlan = DAGPlan.newBuilder().setName("test_dag").build();
+    // This writes to recovery immediately.
+    recoveryService.handle(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(
+        dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null)));
+    waitForDrain(-1);
+    verify(summaryFos, times(1)).hflush();
+    verify(dagFos, times(1)).hflush();
+
+    // This does not write to recovery immediately.
+    recoveryService.handle(new DAGHistoryEvent(dagId, new DAGCommitStartedEvent(dagId, startTime)));
+    waitForDrain(-1);
+    verify(summaryFos, times(2)).hflush();
+    verify(dagFos, times(1)).hflush();
+
+    // Does flush on stop.
+    recoveryService.stop();
+    verify(dagFos, times(2)).hflush();
+  }
+
+  private void waitForDrain(int limit) throws Exception {
+    long maxTime = System.currentTimeMillis() + limit;
+    while (!recoveryService.eventQueue.isEmpty()) {
+      Thread.sleep(10);
+      if (limit != -1 && System.currentTimeMillis() > maxTime) {
+        break;
+      }
+    }
   }
 
   private void touchFile(Path path) throws IOException {
-    FileSystem fs = FileSystem.getLocal(new Configuration());
     fs.create(path).close();
   }
 
@@ -196,4 +378,76 @@ public class TestRecoveryService {
       processedRecoveryEventCounter.addAndGet(1);
     }
   }
+
+  // Public access to ensure it can be created through reflection.
+  public static class MockFileSystem extends FileSystem {
+    // Should be set to a mock fs in the test, only one instance of this class can run.
+    static FileSystem delegate;
+
+    static URI uri = URI.create("mockfs:///");
+
+    @Override
+    public URI getUri() {
+      return uri;
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      return delegate.open(f, bufferSize);
+    }
+
+    @Override
+    public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
+      return delegate.create(f, overwrite, bufferSize);
+    }
+
+    @Override
+    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
+        int bufferSize, short replication, long blockSize, Progressable progress)
+        throws IOException {
+      return delegate.create(f, permission, overwrite, bufferSize, replication, blockSize,
+          progress);
+    }
+
+    @Override
+    public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
+        throws IOException {
+      return delegate.append(f, bufferSize, progress);
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+      return delegate.rename(src, dst);
+    }
+
+    @Override
+    public boolean delete(Path f, boolean recursive) throws IOException {
+      return delegate.delete(f, recursive);
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+      return delegate.listStatus(f);
+    }
+
+    @Override
+    public void setWorkingDirectory(Path new_dir) {
+      delegate.setWorkingDirectory(new_dir);
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      return delegate.getWorkingDirectory();
+    }
+
+    @Override
+    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+      return delegate.mkdirs(f, permission);
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      return delegate.getFileStatus(f);
+    }
+  }
 }