You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/09/19 22:51:00 UTC
tez git commit: TEZ-3434. Add unit tests for flushing of recovery
events. (Harish Jaiprakash via hitesh)
Repository: tez
Updated Branches:
refs/heads/master b17edc401 -> 9cf25d142
TEZ-3434. Add unit tests for flushing of recovery events. (Harish Jaiprakash via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9cf25d14
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9cf25d14
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9cf25d14
Branch: refs/heads/master
Commit: 9cf25d142b4c30c7d903c1ae03bca7b070b706b0
Parents: b17edc4
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Sep 19 15:50:22 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Sep 19 15:50:22 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dag/history/recovery/RecoveryService.java | 3 +-
.../history/recovery/TestRecoveryService.java | 390 +++++++++++++++----
3 files changed, 325 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9cf25d14/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ed0ef7b..c7f540b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3434. Add unit tests for flushing of recovery events.
TEZ-3317. Speculative execution starts too early due to 0 progress.
TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM.
TEZ-3272. Add AMContainerImpl and AMNodeImpl to StateMachine visualization list.
http://git-wip-us.apache.org/repos/asf/tez/blob/9cf25d14/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 3eeddf5..8c29172 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -69,7 +69,8 @@ public class RecoveryService extends AbstractService {
@VisibleForTesting
public static final boolean TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT = true;
- private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+ @VisibleForTesting
+ LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
new LinkedBlockingQueue<DAGHistoryEvent>();
private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
http://git-wip-us.apache.org/repos/asf/tez/blob/9cf25d14/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
index d828d6b..3dec1d7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
@@ -17,90 +17,140 @@
*/
package org.apache.tez.dag.history.recovery;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
-import org.junit.Before;
import org.junit.Test;
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
public class TestRecoveryService {
- private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ TestRecoveryService.class.getName() + "-tmpDir";
+ private static final long startTime = System.currentTimeMillis();
+ private static final ApplicationId appId = ApplicationId.newInstance(startTime, 1);
+ private static final ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ private static final TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ private static final TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
+ private static final TezTaskID tezTaskId = TezTaskID.getInstance(vertexId, 1);
+
private Configuration conf;
+ private AppContext appContext;
+ private MockRecoveryService recoveryService;
+ private Path dagRecoveryPath;
+ private Path summaryPath;
+ private FileSystem fs;
+ private FSDataOutputStream dagFos;
+ private FSDataOutputStream summaryFos;
- @Before
- public void setUp() throws IllegalArgumentException, IOException {
- this.conf = new Configuration();
- FileSystem localFS = FileSystem.getLocal(conf);
- localFS.delete(new Path(TEST_ROOT_DIR), true);
- }
+ private void setup(boolean useMockFs, String[][] configs) throws Exception {
+ conf = new Configuration();
+ if (configs != null) {
+ for (String[] config : configs) {
+ conf.set(config[0], config[1]);
+ }
+ }
- @Test(timeout = 5000)
- public void testDrainEvents() throws IOException {
- Configuration conf = new Configuration();
- AppContext appContext = mock(AppContext.class);
- when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
+ appContext = mock(AppContext.class);
when(appContext.getClock()).thenReturn(new SystemClock());
when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
- ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
when(appContext.getApplicationID()).thenReturn(appId);
- MockRecoveryService recoveryService = new MockRecoveryService(appContext);
+ if (useMockFs) {
+ fs = mock(FileSystem.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new Path("mockfs:///"));
+ conf.set("fs.mockfs.impl", MockFileSystem.class.getName());
+ MockFileSystem.delegate = fs;
+ dagFos = spy(new FSDataOutputStream(new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {}
+ }, null));
+ summaryFos = spy(new FSDataOutputStream(new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {}
+ }, null));
+ } else {
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
+ fs = FileSystem.getLocal(conf);
+ fs.delete(new Path(TEST_ROOT_DIR), true);
+ }
+
+ recoveryService = new MockRecoveryService(appContext);
conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
recoveryService.init(conf);
+
+ summaryPath = TezCommonUtils.getSummaryRecoveryPath(recoveryService.recoveryPath);
+ dagRecoveryPath = TezCommonUtils.getDAGRecoveryPath(
+ recoveryService.recoveryPath, dagId.toString());
+ if (useMockFs) {
+ when(fs.create(eq(dagRecoveryPath), eq(false), anyInt())).thenReturn(dagFos);
+ when(fs.create(eq(summaryPath), eq(false), anyInt())).thenReturn(summaryFos);
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testDrainEvents() throws Exception {
+ setup(false, null);
recoveryService.start();
- TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1);
int randEventCount = new Random().nextInt(100) + 100;
for (int i=0; i< randEventCount; ++i) {
recoveryService.handle(new DAGHistoryEvent(dagId,
- new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L)));
+ new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
}
recoveryService.stop();
assertEquals(randEventCount, recoveryService.processedRecoveryEventCounter.get());
}
@Test(timeout = 5000)
- public void testMultipleDAGFinishedEvent() throws IOException {
- Configuration conf = new Configuration();
- ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- AppContext appContext = mock(AppContext.class);
- when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
- when(appContext.getClock()).thenReturn(new SystemClock());
- when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
- when(appContext.getApplicationID()).thenReturn(appId);
-
- MockRecoveryService recoveryService = new MockRecoveryService(appContext);
- conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
- recoveryService.init(conf);
+ public void testMultipleDAGFinishedEvent() throws Exception {
+ setup(false, null);
recoveryService.start();
- TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1);
int randEventCount = new Random().nextInt(100) + 100;
for (int i=0; i< randEventCount; ++i) {
recoveryService.handle(new DAGHistoryEvent(dagId,
- new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L)));
+ new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
}
recoveryService.await();
assertTrue(recoveryService.outputStreamMap.containsKey(dagId));
@@ -120,23 +170,10 @@ public class TestRecoveryService {
}
@Test(timeout = 5000)
- public void testSummaryPathExisted() throws IOException {
- Configuration conf = new Configuration();
- ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- AppContext appContext = mock(AppContext.class);
- when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
- when(appContext.getClock()).thenReturn(new SystemClock());
- when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
- when(appContext.getApplicationID()).thenReturn(appId);
-
- MockRecoveryService recoveryService = new MockRecoveryService(appContext);
- conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
- recoveryService.init(conf);
+ public void testSummaryPathExisted() throws Exception {
+ setup(false, null);
recoveryService.start();
- TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1);
- Path dagRecoveryPath = TezCommonUtils.getSummaryRecoveryPath(recoveryService.recoveryPath);
- touchFile(dagRecoveryPath);
+ touchFile(summaryPath);
assertFalse(recoveryService.hasRecoveryFailed());
recoveryService.handle(new DAGHistoryEvent(dagId,
new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null,
@@ -146,38 +183,183 @@ public class TestRecoveryService {
recoveryService.handle(new DAGHistoryEvent(dagId,
new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null,
appAttemptId, null)));
+ recoveryService.stop();
}
@Test(timeout = 5000)
- public void testRecoveryPathExisted() throws IOException {
- Configuration conf = new Configuration();
- ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
- AppContext appContext = mock(AppContext.class);
- when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
- when(appContext.getClock()).thenReturn(new SystemClock());
- when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
- when(appContext.getApplicationID()).thenReturn(appId);
-
- MockRecoveryService recoveryService = new MockRecoveryService(appContext);
- conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
- recoveryService.init(conf);
+ public void testRecoveryPathExisted() throws Exception {
+ setup(false, null);
recoveryService.start();
- TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1);
- Path dagRecoveryPath = TezCommonUtils.getDAGRecoveryPath(recoveryService.recoveryPath, dagId.toString());
touchFile(dagRecoveryPath);
assertFalse(recoveryService.hasRecoveryFailed());
recoveryService.handle(new DAGHistoryEvent(dagId,
- new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L)));
+ new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
// wait for recovery event to be handled
recoveryService.await();
assertTrue(recoveryService.hasRecoveryFailed());
// be able to handle recovery event after fatal error
recoveryService.handle(new DAGHistoryEvent(dagId,
- new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L)));
+ new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
+ recoveryService.stop();
+ }
+
+ @Test(timeout=5000)
+ public void testRecoveryFlushOnMaxEvents() throws Exception {
+ setup(true, new String[][] {
+ {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "10"},
+ {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "-1"}
+ });
+ recoveryService.start();
+
+ // Send 1 event less, wait for drain
+ for (int i = 0; i < 9; ++i) {
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+ }
+ waitForDrain(-1);
+ verify(dagFos, times(0)).hflush();
+
+ // This event should cause the flush.
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+ waitForDrain(-1);
+ verify(dagFos, times(1)).hflush();
+
+ recoveryService.stop();
+ }
+
+ @Test(timeout=10000)
+ public void testRecoveryFlushOnTimeoutEvents() throws Exception {
+ setup(true, new String[][] {
+ {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "-1"},
+ {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "5"}
+ });
+ recoveryService.start();
+
+ // Send lot of events.
+ for (int i = 0; i < TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT; ++i) {
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+ }
+ // wait for timeout.
+ Thread.sleep(5000);
+ assertTrue(recoveryService.eventQueue.isEmpty());
+ verify(fs, times(1)).create(eq(dagRecoveryPath), eq(false), anyInt());
+ verify(dagFos, times(0)).hflush();
+
+ // The flush is trigged by sending 1 event after the timeout.
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+ waitForDrain(1000);
+ verify(dagFos, times(1)).hflush();
+
+ recoveryService.stop();
+ }
+
+ @Test(timeout=10000)
+ public void testRecoveryFlush() throws Exception {
+ setup(true, new String[][] {
+ {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "10"},
+ {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "5"}
+ });
+ recoveryService.start();
+
+ // 5 second flush
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+ Thread.sleep(5000);
+ assertTrue(recoveryService.eventQueue.isEmpty());
+ verify(fs, times(1)).create(eq(dagRecoveryPath), eq(false), anyInt());
+ verify(dagFos, times(0)).hflush();
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+ waitForDrain(1000);
+ verify(dagFos, times(1)).hflush();
+
+ // Number of events flush.
+ for (int i = 0; i < 9; ++i) {
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+ }
+ waitForDrain(-1);
+ verify(dagFos, times(1)).hflush();
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+ waitForDrain(-1);
+ verify(dagFos, times(2)).hflush();
+
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+
+ recoveryService.stop();
+ }
+
+ @Test(timeout=50000)
+ public void testRecoveryFlushOnStop() throws Exception {
+ setup(true, new String[][] {
+ {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "-1"},
+ {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "-1"}
+ });
+ recoveryService.start();
+
+ // Does not flush on event counts.
+ for (int i = 0; i < TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT; ++i) {
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+ }
+ waitForDrain(-1);
+ verify(dagFos, times(0)).hflush();
+
+ // Does not flush on timeout.
+ Thread.sleep(TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT * 1000);
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
+ waitForDrain(-1);
+ verify(dagFos, times(0)).hflush();
+
+ // Does flush on stop.
+ recoveryService.stop();
+ verify(dagFos, times(1)).hflush();
+ }
+
+ @Test(timeout=5000)
+ public void testRecoveryFlushOnSummaryEvent() throws Exception {
+ setup(true, new String[][] {
+ {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "-1"},
+ {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "-1"}
+ });
+ recoveryService.start();
+
+ DAGPlan dagPlan = DAGPlan.newBuilder().setName("test_dag").build();
+ // This writes to recovery immediately.
+ recoveryService.handle(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(
+ dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null)));
+ waitForDrain(-1);
+ verify(summaryFos, times(1)).hflush();
+ verify(dagFos, times(1)).hflush();
+
+ // This does not write to recovery immediately.
+ recoveryService.handle(new DAGHistoryEvent(dagId, new DAGCommitStartedEvent(dagId, startTime)));
+ waitForDrain(-1);
+ verify(summaryFos, times(2)).hflush();
+ verify(dagFos, times(1)).hflush();
+
+ // Does flush on stop.
+ recoveryService.stop();
+ verify(dagFos, times(2)).hflush();
+ }
+
+ private void waitForDrain(int limit) throws Exception {
+ long maxTime = System.currentTimeMillis() + limit;
+ while (!recoveryService.eventQueue.isEmpty()) {
+ Thread.sleep(10);
+ if (limit != -1 && System.currentTimeMillis() > maxTime) {
+ break;
+ }
+ }
}
private void touchFile(Path path) throws IOException {
- FileSystem fs = FileSystem.getLocal(new Configuration());
fs.create(path).close();
}
@@ -196,4 +378,76 @@ public class TestRecoveryService {
processedRecoveryEventCounter.addAndGet(1);
}
}
+
+ // Public access to ensure it can be created through reflection.
+ public static class MockFileSystem extends FileSystem {
+ // Should be set to a mock fs in the test, only one instance of this class can run.
+ static FileSystem delegate;
+
+ static URI uri = URI.create("mockfs:///");
+
+ @Override
+ public URI getUri() {
+ return uri;
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return delegate.open(f, bufferSize);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
+ return delegate.create(f, overwrite, bufferSize);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
+ int bufferSize, short replication, long blockSize, Progressable progress)
+ throws IOException {
+ return delegate.create(f, permission, overwrite, bufferSize, replication, blockSize,
+ progress);
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
+ throws IOException {
+ return delegate.append(f, bufferSize, progress);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return delegate.rename(src, dst);
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return delegate.delete(f, recursive);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+ return delegate.listStatus(f);
+ }
+
+ @Override
+ public void setWorkingDirectory(Path new_dir) {
+ delegate.setWorkingDirectory(new_dir);
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return delegate.getWorkingDirectory();
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return delegate.mkdirs(f, permission);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return delegate.getFileStatus(f);
+ }
+ }
}