You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/21 20:20:48 UTC

[02/10] git commit: TAJO-1046: Remove hadoop native dependency of pullserver. (jinho)

TAJO-1046: Remove hadoop native dependency of pullserver. (jinho)

Closes #143


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/469820db
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/469820db
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/469820db

Branch: refs/heads/block_iteration
Commit: 469820db19aedcf10ea41807e4cbc7ade48d780c
Parents: 1cff979
Author: jhkim <jh...@apache.org>
Authored: Sat Sep 20 15:22:05 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Sat Sep 20 15:22:05 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../pullserver/FadvisedFileRegionWrapper.java   |  35 ----
 .../pullserver/listener/FileCloseListener.java  |  53 ------
 .../pullserver/FadvisedFileRegionWrapper.java   |  34 ----
 .../pullserver/listener/FileCloseListener.java  |  55 ------
 .../pullserver/FadvisedFileRegionWrapper.java   |  36 ----
 .../pullserver/listener/FileCloseListener.java  |  55 ------
 .../tajo/pullserver/FadvisedChunkedFile.java    |  81 +++++++++
 .../tajo/pullserver/FadvisedFileRegion.java     | 170 +++++++++++++++++++
 .../tajo/pullserver/FileCloseListener.java      |  53 ++++++
 .../tajo/pullserver/PullServerAuxService.java   |   4 +-
 .../apache/tajo/pullserver/PullServerUtil.java  |  90 ++++++++++
 .../tajo/pullserver/TajoPullServerService.java  |   9 +-
 13 files changed, 400 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 026f020..a5f31f9 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,8 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1046: Remove hadoop native dependency of pullserver. (jinho)
+
     TAJO-1040: Misuse netty HashedWheelTimer. (jinho)
 
     TAJO-1034: Reduce Explicit Use of JVM Internal Class. (Jihun Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java b/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
deleted file mode 100644
index 335dff0..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.hadoop.io.ReadaheadPool;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-@Deprecated
-public class FadvisedFileRegionWrapper extends org.apache.hadoop.mapred.FadvisedFileRegion {
-
-
-  public FadvisedFileRegionWrapper(RandomAccessFile file, long position, long count,
-                                   boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
-                                   String identifier) throws IOException {
-    super(file, position, count, manageOsCache, readaheadLength, readaheadPool, identifier);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
deleted file mode 100644
index 2e36644..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.listener;
-
-import org.apache.hadoop.mapred.FadvisedFileRegion;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.apache.tajo.pullserver.TajoPullServerService;
-
-@Deprecated
-public class FileCloseListener implements ChannelFutureListener {
-
-  private FadvisedFileRegion filePart;
-  private String requestUri;
-  private TajoPullServerService pullServerService;
-  private long startTime;
-
-  public FileCloseListener(FadvisedFileRegion filePart,
-                           String requestUri,
-                           long startTime,
-                           TajoPullServerService pullServerService) {
-    this.filePart = filePart;
-    this.requestUri = requestUri;
-    this.pullServerService = pullServerService;
-    this.startTime = startTime;
-  }
-
-  // TODO error handling; distinguish IO/connection failures,
-  //      attribute to appropriate spill output
-  @Override
-  public void operationComplete(ChannelFuture future) {
-    filePart.releaseExternalResources();
-    if (pullServerService != null) {
-      pullServerService.completeFileChunk(filePart, requestUri, startTime);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java b/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
deleted file mode 100644
index 42d1cd8..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.hadoop.io.ReadaheadPool;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-public class FadvisedFileRegionWrapper extends org.apache.hadoop.mapred.FadvisedFileRegion {
-
-
-  public FadvisedFileRegionWrapper(RandomAccessFile file, long position, long count,
-                                   boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
-                                   String identifier) throws IOException {
-    super(file, position, count, manageOsCache, readaheadLength, readaheadPool, identifier);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
deleted file mode 100644
index be599c3..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.listener;
-
-import org.apache.hadoop.mapred.FadvisedFileRegion;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-public class FileCloseListener implements ChannelFutureListener {
-
-  private FadvisedFileRegion filePart;
-  private String requestUri;
-  private TajoPullServerService pullServerService;
-  private long startTime;
-
-  public FileCloseListener(FadvisedFileRegion filePart,
-                           String requestUri,
-                           long startTime,
-                           TajoPullServerService pullServerService) {
-    this.filePart = filePart;
-    this.requestUri = requestUri;
-    this.pullServerService = pullServerService;
-    this.startTime = startTime;
-  }
-
-  // TODO error handling; distinguish IO/connection failures,
-  //      attribute to appropriate spill output
-  @Override
-  public void operationComplete(ChannelFuture future) {
-    if(future.isSuccess()){
-      filePart.transferSuccessful();
-    }
-    filePart.releaseExternalResources();
-    if (pullServerService != null) {
-      pullServerService.completeFileChunk(filePart, requestUri, startTime);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java b/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
deleted file mode 100644
index ee53bc6..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.hadoop.io.ReadaheadPool;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-/* tajo is not yet supported on Windows */
-public class FadvisedFileRegionWrapper extends org.apache.hadoop.mapred.FadvisedFileRegion {
-  public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
-
-  public FadvisedFileRegionWrapper(RandomAccessFile file, long position, long count,
-                                   boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
-                                   String identifier) throws IOException {
-    super(file, position, count, manageOsCache, readaheadLength, readaheadPool,
-        identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
deleted file mode 100644
index 7d4ca3a..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.listener;
-
-import org.apache.hadoop.mapred.FadvisedFileRegion;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.apache.tajo.pullserver.TajoPullServerService;
-
-public class FileCloseListener implements ChannelFutureListener {
-
-  private FadvisedFileRegion filePart;
-  private String requestUri;
-  private TajoPullServerService pullServerService;
-  private long startTime;
-
-  public FileCloseListener(FadvisedFileRegion filePart,
-                           String requestUri,
-                           long startTime,
-                           TajoPullServerService pullServerService) {
-    this.filePart = filePart;
-    this.requestUri = requestUri;
-    this.pullServerService = pullServerService;
-    this.startTime = startTime;
-  }
-
-  // TODO error handling; distinguish IO/connection failures,
-  //      attribute to appropriate spill output
-  @Override
-  public void operationComplete(ChannelFuture future) {
-    if(future.isSuccess()){
-      filePart.transferSuccessful();
-    }
-    filePart.releaseExternalResources();
-    if (pullServerService != null) {
-      pullServerService.completeFileChunk(filePart, requestUri, startTime);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
new file mode 100644
index 0000000..b0b8d18
--- /dev/null
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+
+  private ReadaheadPool.ReadaheadRequest readaheadRequest;
+
+  public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+                             int chunkSize, boolean manageOsCache, int readaheadLength,
+                             ReadaheadPool readaheadPool, String identifier) throws IOException {
+    super(file, position, count, chunkSize);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+  }
+
+  @Override
+  public Object nextChunk() throws Exception {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool
+          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+              getEndOffset(), readaheadRequest);
+    }
+    return super.nextChunk();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
+      try {
+        PullServerUtil.posixFadviseIfPossible(identifier,
+            fd,
+            getStartOffset(), getEndOffset() - getStartOffset(),
+            NativeIO.POSIX.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+    super.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
new file mode 100644
index 0000000..18cf4b6
--- /dev/null
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.channel.DefaultFileRegion;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+  private final long count;
+  private final long position;
+  private final int shuffleBufferSize;
+  private final boolean shuffleTransferToAllowed;
+  private final FileChannel fileChannel;
+
+  private ReadaheadPool.ReadaheadRequest readaheadRequest;
+  public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+
+  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+                            boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+                            String identifier) throws IOException {
+    this(file, position, count, manageOsCache, readaheadLength, readaheadPool,
+        identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true);
+  }
+
+  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+                            boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+                            String identifier, int shuffleBufferSize,
+                            boolean shuffleTransferToAllowed) throws IOException {
+    super(file.getChannel(), position, count);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+    this.fileChannel = file.getChannel();
+    this.count = count;
+    this.position = position;
+    this.shuffleBufferSize = shuffleBufferSize;
+    this.shuffleTransferToAllowed = shuffleTransferToAllowed;
+  }
+
+  @Override
+  public long transferTo(WritableByteChannel target, long position)
+      throws IOException {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+          getPosition() + position, readaheadLength,
+          getPosition() + getCount(), readaheadRequest);
+    }
+
+    if(this.shuffleTransferToAllowed) {
+      return super.transferTo(target, position);
+    } else {
+      return customShuffleTransfer(target, position);
+    }
+  }
+
+  /**
+   * This method transfers data using local buffer. It transfers data from
+   * a disk to a local buffer in memory, and then it transfers data from the
+   * buffer to the target. This is used only if transferTo is disallowed in
+   * the configuration file. super.TransferTo does not perform well on Windows
+   * due to a small IO request generated. customShuffleTransfer can control
+   * the size of the IO requests by changing the size of the intermediate
+   * buffer.
+   */
+  @VisibleForTesting
+  long customShuffleTransfer(WritableByteChannel target, long position)
+      throws IOException {
+    long actualCount = this.count - position;
+    if (actualCount < 0 || position < 0) {
+      throw new IllegalArgumentException(
+          "position out of range: " + position +
+              " (expected: 0 - " + (this.count - 1) + ')');
+    }
+    if (actualCount == 0) {
+      return 0L;
+    }
+
+    long trans = actualCount;
+    int readSize;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+
+    while(trans > 0L &&
+        (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+      //adjust counters and buffer limit
+      if(readSize < trans) {
+        trans -= readSize;
+        position += readSize;
+        byteBuffer.flip();
+      } else {
+        //We can read more than we need if the actualCount is not multiple
+        //of the byteBuffer size and file is big enough. In that case we cannot
+        //use flip method but we need to set buffer limit manually to trans.
+        byteBuffer.limit((int)trans);
+        byteBuffer.position(0);
+        position += trans;
+        trans = 0;
+      }
+
+      //write data to the target
+      while(byteBuffer.hasRemaining()) {
+        target.write(byteBuffer);
+      }
+
+      byteBuffer.clear();
+    }
+
+    return actualCount - trans;
+  }
+
+
+  @Override
+  public void releaseExternalResources() {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    super.releaseExternalResources();
+  }
+
+  /**
+   * Call when the transfer completes successfully so we can advise the OS that
+   * we don't need the region to be cached anymore.
+   */
+  public void transferSuccessful() {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) {
+      try {
+        PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(),
+            NativeIO.POSIX.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
new file mode 100644
index 0000000..236db89
--- /dev/null
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+public class FileCloseListener implements ChannelFutureListener {
+
+  private FadvisedFileRegion filePart;
+  private String requestUri;
+  private TajoPullServerService pullServerService;
+  private long startTime;
+
+  public FileCloseListener(FadvisedFileRegion filePart,
+                           String requestUri,
+                           long startTime,
+                           TajoPullServerService pullServerService) {
+    this.filePart = filePart;
+    this.requestUri = requestUri;
+    this.pullServerService = pullServerService;
+    this.startTime = startTime;
+  }
+
+  // TODO error handling; distinguish IO/connection failures,
+  //      attribute to appropriate spill output
+  @Override
+  public void operationComplete(ChannelFuture future) {
+    if(future.isSuccess()){
+      filePart.transferSuccessful();
+    }
+    filePart.releaseExternalResources();
+    if (pullServerService != null) {
+      pullServerService.completeFileChunk(filePart, requestUri, startTime);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
index e6e7ce3..5f9f9e8 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.mapred.FadvisedChunkedFile;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
@@ -48,7 +47,6 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.listener.FileCloseListener;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
@@ -489,7 +487,7 @@ public class PullServerAuxService extends AuxiliaryService {
       }
       ChannelFuture writeFuture;
       if (ch.getPipeline().get(SslHandler.class) == null) {
-        final FadvisedFileRegionWrapper partition = new FadvisedFileRegionWrapper(spill,
+        final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
             file.startOffset, file.length(), manageOsCache, readaheadLength,
             readaheadPool, file.getFile().getAbsolutePath());
         writeFuture = ch.write(partition);

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
new file mode 100644
index 0000000..564950f
--- /dev/null
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.apache.commons.lang.reflect.MethodUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import java.io.FileDescriptor;
+import java.lang.reflect.Method;
+
+public class PullServerUtil {
+  private static final Log LOG = LogFactory.getLog(PullServerUtil.class);
+
+  private static boolean nativeIOPossible = false;
+  private static Method posixFadviseIfPossible;
+
+  static {
+    if (NativeIO.isAvailable() && loadNativeIO()) {
+      nativeIOPossible = true;
+    } else {
+      LOG.warn("Unable to load hadoop nativeIO");
+    }
+  }
+
+  public static boolean isNativeIOPossible() {
+    return nativeIOPossible;
+  }
+
+  /**
+   * Call posix_fadvise on the given file descriptor. See the manpage
+   * for this syscall for more information. On systems where this
+   * call is not available, does nothing.
+   */
+  public static void posixFadviseIfPossible(String identifier, java.io.FileDescriptor fd,
+                                            long offset, long len, int flags) {
+    if (nativeIOPossible) {
+      try {
+        posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags);
+      } catch (Throwable t) {
+        nativeIOPossible = false;
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+  }
+
+  /* load hadoop native method if possible */
+  private static boolean loadNativeIO() {
+    boolean loaded = true;
+    if (nativeIOPossible) return loaded;
+
+    Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE};
+    try {
+      Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]);
+      Class posixClass;
+      if (getCacheManipulator != null) {
+        Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null);
+        posixClass = posix.getClass();
+      } else {
+        posixClass = NativeIO.POSIX.class;
+      }
+      posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters);
+    } catch (Throwable e) {
+      loaded = false;
+      LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage());
+    }
+
+    if (posixFadviseIfPossible == null) {
+      loaded = false;
+    }
+    return loaded;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index f7bc489..720f0ca 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.mapred.FadvisedChunkedFile;
-import org.apache.hadoop.mapred.FadvisedFileRegion;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
@@ -47,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.listener.FileCloseListener;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
@@ -384,7 +381,7 @@ public class TajoPullServerService extends AbstractService {
 
   Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<String, ProcessingStatus>();
 
-  public void completeFileChunk(FadvisedFileRegion filePart,
+  public void completeFileChunk(FileRegion filePart,
                                    String requestUri,
                                    long startTime) {
     ProcessingStatus status = processingStatusMap.get(requestUri);
@@ -412,7 +409,7 @@ public class TajoPullServerService extends AbstractService {
       this.numFiles = numFiles;
       this.remainFiles = new AtomicInteger(numFiles);
     }
-    public void decrementRemainFiles(FadvisedFileRegion filePart, long fileStartTime) {
+    public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
       synchronized(remainFiles) {
         long fileSendTime = System.currentTimeMillis() - fileStartTime;
         if (fileSendTime > 20 * 1000) {
@@ -649,7 +646,7 @@ public class TajoPullServerService extends AbstractService {
       try {
         spill = new RandomAccessFile(file.getFile(), "r");
         if (ch.getPipeline().get(SslHandler.class) == null) {
-          final FadvisedFileRegionWrapper filePart = new FadvisedFileRegionWrapper(spill,
+          final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
               file.startOffset, file.length(), manageOsCache, readaheadLength,
               readaheadPool, file.getFile().getAbsolutePath());
           writeFuture = ch.write(filePart);