You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/21 20:20:48 UTC
[02/10] git commit: TAJO-1046: Remove hadoop native dependency of
pullserver. (jinho)
TAJO-1046: Remove hadoop native dependency of pullserver. (jinho)
Closes #143
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/469820db
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/469820db
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/469820db
Branch: refs/heads/block_iteration
Commit: 469820db19aedcf10ea41807e4cbc7ade48d780c
Parents: 1cff979
Author: jhkim <jh...@apache.org>
Authored: Sat Sep 20 15:22:05 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Sat Sep 20 15:22:05 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../pullserver/FadvisedFileRegionWrapper.java | 35 ----
.../pullserver/listener/FileCloseListener.java | 53 ------
.../pullserver/FadvisedFileRegionWrapper.java | 34 ----
.../pullserver/listener/FileCloseListener.java | 55 ------
.../pullserver/FadvisedFileRegionWrapper.java | 36 ----
.../pullserver/listener/FileCloseListener.java | 55 ------
.../tajo/pullserver/FadvisedChunkedFile.java | 81 +++++++++
.../tajo/pullserver/FadvisedFileRegion.java | 170 +++++++++++++++++++
.../tajo/pullserver/FileCloseListener.java | 53 ++++++
.../tajo/pullserver/PullServerAuxService.java | 4 +-
.../apache/tajo/pullserver/PullServerUtil.java | 90 ++++++++++
.../tajo/pullserver/TajoPullServerService.java | 9 +-
13 files changed, 400 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 026f020..a5f31f9 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,8 @@ Release 0.9.0 - unreleased
IMPROVEMENT
+ TAJO-1046: Remove hadoop native dependency of pullserver. (jinho)
+
TAJO-1040: Misuse netty HashedWheelTimer. (jinho)
TAJO-1034: Reduce Explicit Use of JVM Internal Class. (Jihun Kang via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java b/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
deleted file mode 100644
index 335dff0..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.hadoop.io.ReadaheadPool;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-@Deprecated
-public class FadvisedFileRegionWrapper extends org.apache.hadoop.mapred.FadvisedFileRegion {
-
-
- public FadvisedFileRegionWrapper(RandomAccessFile file, long position, long count,
- boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
- String identifier) throws IOException {
- super(file, position, count, manageOsCache, readaheadLength, readaheadPool, identifier);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
deleted file mode 100644
index 2e36644..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.listener;
-
-import org.apache.hadoop.mapred.FadvisedFileRegion;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.apache.tajo.pullserver.TajoPullServerService;
-
-@Deprecated
-public class FileCloseListener implements ChannelFutureListener {
-
- private FadvisedFileRegion filePart;
- private String requestUri;
- private TajoPullServerService pullServerService;
- private long startTime;
-
- public FileCloseListener(FadvisedFileRegion filePart,
- String requestUri,
- long startTime,
- TajoPullServerService pullServerService) {
- this.filePart = filePart;
- this.requestUri = requestUri;
- this.pullServerService = pullServerService;
- this.startTime = startTime;
- }
-
- // TODO error handling; distinguish IO/connection failures,
- // attribute to appropriate spill output
- @Override
- public void operationComplete(ChannelFuture future) {
- filePart.releaseExternalResources();
- if (pullServerService != null) {
- pullServerService.completeFileChunk(filePart, requestUri, startTime);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java b/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
deleted file mode 100644
index 42d1cd8..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.hadoop.io.ReadaheadPool;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-public class FadvisedFileRegionWrapper extends org.apache.hadoop.mapred.FadvisedFileRegion {
-
-
- public FadvisedFileRegionWrapper(RandomAccessFile file, long position, long count,
- boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
- String identifier) throws IOException {
- super(file, position, count, manageOsCache, readaheadLength, readaheadPool, identifier);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
deleted file mode 100644
index be599c3..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.listener;
-
-import org.apache.hadoop.mapred.FadvisedFileRegion;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-public class FileCloseListener implements ChannelFutureListener {
-
- private FadvisedFileRegion filePart;
- private String requestUri;
- private TajoPullServerService pullServerService;
- private long startTime;
-
- public FileCloseListener(FadvisedFileRegion filePart,
- String requestUri,
- long startTime,
- TajoPullServerService pullServerService) {
- this.filePart = filePart;
- this.requestUri = requestUri;
- this.pullServerService = pullServerService;
- this.startTime = startTime;
- }
-
- // TODO error handling; distinguish IO/connection failures,
- // attribute to appropriate spill output
- @Override
- public void operationComplete(ChannelFuture future) {
- if(future.isSuccess()){
- filePart.transferSuccessful();
- }
- filePart.releaseExternalResources();
- if (pullServerService != null) {
- pullServerService.completeFileChunk(filePart, requestUri, startTime);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java b/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
deleted file mode 100644
index ee53bc6..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/FadvisedFileRegionWrapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.hadoop.io.ReadaheadPool;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-/* tajo is not yet supported on Windows */
-public class FadvisedFileRegionWrapper extends org.apache.hadoop.mapred.FadvisedFileRegion {
- public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
-
- public FadvisedFileRegionWrapper(RandomAccessFile file, long position, long count,
- boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
- String identifier) throws IOException {
- super(file, position, count, manageOsCache, readaheadLength, readaheadPool,
- identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
deleted file mode 100644
index 7d4ca3a..0000000
--- a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.listener;
-
-import org.apache.hadoop.mapred.FadvisedFileRegion;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.apache.tajo.pullserver.TajoPullServerService;
-
-public class FileCloseListener implements ChannelFutureListener {
-
- private FadvisedFileRegion filePart;
- private String requestUri;
- private TajoPullServerService pullServerService;
- private long startTime;
-
- public FileCloseListener(FadvisedFileRegion filePart,
- String requestUri,
- long startTime,
- TajoPullServerService pullServerService) {
- this.filePart = filePart;
- this.requestUri = requestUri;
- this.pullServerService = pullServerService;
- this.startTime = startTime;
- }
-
- // TODO error handling; distinguish IO/connection failures,
- // attribute to appropriate spill output
- @Override
- public void operationComplete(ChannelFuture future) {
- if(future.isSuccess()){
- filePart.transferSuccessful();
- }
- filePart.releaseExternalResources();
- if (pullServerService != null) {
- pullServerService.completeFileChunk(filePart, requestUri, startTime);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
new file mode 100644
index 0000000..b0b8d18
--- /dev/null
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+ private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final ReadaheadPool readaheadPool;
+ private final FileDescriptor fd;
+ private final String identifier;
+
+ private ReadaheadPool.ReadaheadRequest readaheadRequest;
+
+ public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+ int chunkSize, boolean manageOsCache, int readaheadLength,
+ ReadaheadPool readaheadPool, String identifier) throws IOException {
+ super(file, position, count, chunkSize);
+ this.manageOsCache = manageOsCache;
+ this.readaheadLength = readaheadLength;
+ this.readaheadPool = readaheadPool;
+ this.fd = file.getFD();
+ this.identifier = identifier;
+ }
+
+ @Override
+ public Object nextChunk() throws Exception {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+ readaheadRequest = readaheadPool
+ .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+ getEndOffset(), readaheadRequest);
+ }
+ return super.nextChunk();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
+ try {
+ PullServerUtil.posixFadviseIfPossible(identifier,
+ fd,
+ getStartOffset(), getEndOffset() - getStartOffset(),
+ NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ } catch (Throwable t) {
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
new file mode 100644
index 0000000..18cf4b6
--- /dev/null
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.channel.DefaultFileRegion;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+ private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final ReadaheadPool readaheadPool;
+ private final FileDescriptor fd;
+ private final String identifier;
+ private final long count;
+ private final long position;
+ private final int shuffleBufferSize;
+ private final boolean shuffleTransferToAllowed;
+ private final FileChannel fileChannel;
+
+ private ReadaheadPool.ReadaheadRequest readaheadRequest;
+ public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+
+ public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+ boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+ String identifier) throws IOException {
+ this(file, position, count, manageOsCache, readaheadLength, readaheadPool,
+ identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true);
+ }
+
+ public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+ boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+ String identifier, int shuffleBufferSize,
+ boolean shuffleTransferToAllowed) throws IOException {
+ super(file.getChannel(), position, count);
+ this.manageOsCache = manageOsCache;
+ this.readaheadLength = readaheadLength;
+ this.readaheadPool = readaheadPool;
+ this.fd = file.getFD();
+ this.identifier = identifier;
+ this.fileChannel = file.getChannel();
+ this.count = count;
+ this.position = position;
+ this.shuffleBufferSize = shuffleBufferSize;
+ this.shuffleTransferToAllowed = shuffleTransferToAllowed;
+ }
+
+ @Override
+ public long transferTo(WritableByteChannel target, long position)
+ throws IOException {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+ readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+ getPosition() + position, readaheadLength,
+ getPosition() + getCount(), readaheadRequest);
+ }
+
+ if(this.shuffleTransferToAllowed) {
+ return super.transferTo(target, position);
+ } else {
+ return customShuffleTransfer(target, position);
+ }
+ }
+
+ /**
+ * This method transfers data using local buffer. It transfers data from
+ * a disk to a local buffer in memory, and then it transfers data from the
+ * buffer to the target. This is used only if transferTo is disallowed in
+ * the configuration file. super.TransferTo does not perform well on Windows
+ * due to a small IO request generated. customShuffleTransfer can control
+ * the size of the IO requests by changing the size of the intermediate
+ * buffer.
+ */
+ @VisibleForTesting
+ long customShuffleTransfer(WritableByteChannel target, long position)
+ throws IOException {
+ long actualCount = this.count - position;
+ if (actualCount < 0 || position < 0) {
+ throw new IllegalArgumentException(
+ "position out of range: " + position +
+ " (expected: 0 - " + (this.count - 1) + ')');
+ }
+ if (actualCount == 0) {
+ return 0L;
+ }
+
+ long trans = actualCount;
+ int readSize;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+
+ while(trans > 0L &&
+ (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+ //adjust counters and buffer limit
+ if(readSize < trans) {
+ trans -= readSize;
+ position += readSize;
+ byteBuffer.flip();
+ } else {
+ //We can read more than we need if the actualCount is not multiple
+ //of the byteBuffer size and file is big enough. In that case we cannot
+ //use flip method but we need to set buffer limit manually to trans.
+ byteBuffer.limit((int)trans);
+ byteBuffer.position(0);
+ position += trans;
+ trans = 0;
+ }
+
+ //write data to the target
+ while(byteBuffer.hasRemaining()) {
+ target.write(byteBuffer);
+ }
+
+ byteBuffer.clear();
+ }
+
+ return actualCount - trans;
+ }
+
+
+ @Override
+ public void releaseExternalResources() {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+ super.releaseExternalResources();
+ }
+
+ /**
+ * Call when the transfer completes successfully so we can advise the OS that
+ * we don't need the region to be cached anymore.
+ */
+ public void transferSuccessful() {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) {
+ try {
+ PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(),
+ NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ } catch (Throwable t) {
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
new file mode 100644
index 0000000..236db89
--- /dev/null
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+public class FileCloseListener implements ChannelFutureListener {
+
+ private FadvisedFileRegion filePart;
+ private String requestUri;
+ private TajoPullServerService pullServerService;
+ private long startTime;
+
+ public FileCloseListener(FadvisedFileRegion filePart,
+ String requestUri,
+ long startTime,
+ TajoPullServerService pullServerService) {
+ this.filePart = filePart;
+ this.requestUri = requestUri;
+ this.pullServerService = pullServerService;
+ this.startTime = startTime;
+ }
+
+ // TODO error handling; distinguish IO/connection failures,
+ // attribute to appropriate spill output
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if(future.isSuccess()){
+ filePart.transferSuccessful();
+ }
+ filePart.releaseExternalResources();
+ if (pullServerService != null) {
+ pullServerService.completeFileChunk(filePart, requestUri, startTime);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
index e6e7ce3..5f9f9e8 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.mapred.FadvisedChunkedFile;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
@@ -48,7 +47,6 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.listener.FileCloseListener;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
@@ -489,7 +487,7 @@ public class PullServerAuxService extends AuxiliaryService {
}
ChannelFuture writeFuture;
if (ch.getPipeline().get(SslHandler.class) == null) {
- final FadvisedFileRegionWrapper partition = new FadvisedFileRegionWrapper(spill,
+ final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
file.startOffset, file.length(), manageOsCache, readaheadLength,
readaheadPool, file.getFile().getAbsolutePath());
writeFuture = ch.write(partition);
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
new file mode 100644
index 0000000..564950f
--- /dev/null
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.apache.commons.lang.reflect.MethodUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import java.io.FileDescriptor;
+import java.lang.reflect.Method;
+
+public class PullServerUtil {
+ private static final Log LOG = LogFactory.getLog(PullServerUtil.class);
+
+ private static boolean nativeIOPossible = false;
+ private static Method posixFadviseIfPossible;
+
+ static {
+ if (NativeIO.isAvailable() && loadNativeIO()) {
+ nativeIOPossible = true;
+ } else {
+ LOG.warn("Unable to load hadoop nativeIO");
+ }
+ }
+
+ public static boolean isNativeIOPossible() {
+ return nativeIOPossible;
+ }
+
+ /**
+ * Call posix_fadvise on the given file descriptor. See the manpage
+ * for this syscall for more information. On systems where this
+ * call is not available, does nothing.
+ */
+ public static void posixFadviseIfPossible(String identifier, java.io.FileDescriptor fd,
+ long offset, long len, int flags) {
+ if (nativeIOPossible) {
+ try {
+ posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags);
+ } catch (Throwable t) {
+ nativeIOPossible = false;
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ }
+
+ /* load hadoop native method if possible */
+ private static boolean loadNativeIO() {
+ boolean loaded = true;
+ if (nativeIOPossible) return loaded;
+
+ Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE};
+ try {
+ Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]);
+ Class posixClass;
+ if (getCacheManipulator != null) {
+ Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null);
+ posixClass = posix.getClass();
+ } else {
+ posixClass = NativeIO.POSIX.class;
+ }
+ posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters);
+ } catch (Throwable e) {
+ loaded = false;
+ LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage());
+ }
+
+ if (posixFadviseIfPossible == null) {
+ loaded = false;
+ }
+ return loaded;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/469820db/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index f7bc489..720f0ca 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.mapred.FadvisedChunkedFile;
-import org.apache.hadoop.mapred.FadvisedFileRegion;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
@@ -47,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.listener.FileCloseListener;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.HashShuffleAppenderManager;
@@ -384,7 +381,7 @@ public class TajoPullServerService extends AbstractService {
Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<String, ProcessingStatus>();
- public void completeFileChunk(FadvisedFileRegion filePart,
+ public void completeFileChunk(FileRegion filePart,
String requestUri,
long startTime) {
ProcessingStatus status = processingStatusMap.get(requestUri);
@@ -412,7 +409,7 @@ public class TajoPullServerService extends AbstractService {
this.numFiles = numFiles;
this.remainFiles = new AtomicInteger(numFiles);
}
- public void decrementRemainFiles(FadvisedFileRegion filePart, long fileStartTime) {
+ public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
synchronized(remainFiles) {
long fileSendTime = System.currentTimeMillis() - fileStartTime;
if (fileSendTime > 20 * 1000) {
@@ -649,7 +646,7 @@ public class TajoPullServerService extends AbstractService {
try {
spill = new RandomAccessFile(file.getFile(), "r");
if (ch.getPipeline().get(SslHandler.class) == null) {
- final FadvisedFileRegionWrapper filePart = new FadvisedFileRegionWrapper(spill,
+ final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
file.startOffset, file.length(), manageOsCache, readaheadLength,
readaheadPool, file.getFile().getAbsolutePath());
writeFuture = ch.write(filePart);