You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:43 UTC
[27/31] incubator-ignite git commit: # IGNITE-386: WIP on internal
namings (4).
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutputIgfsStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutputIgfsStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutputIgfsStream.java
new file mode 100644
index 0000000..ab5fa68
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutputIgfsStream.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.commons.logging.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * IGFS Hadoop output stream implementation.
+ */
+public class HadoopOutputIgfsStream extends OutputStream implements HadoopIgfsStreamEventListener {
+ /** Log instance. */
+ private Log log;
+
+ /** Client logger. */
+ private IgfsLogger clientLog;
+
+ /** Log stream ID. */
+ private long logStreamId;
+
+ /** Server stream delegate. */
+ private HadoopIgfsStreamDelegate delegate;
+
+ /** Closed flag. */
+ private volatile boolean closed;
+
+ /** Flag set if stream was closed due to connection breakage. */
+ private boolean connBroken;
+
+ /** Error message. */
+ private volatile String errMsg;
+
+ /** Read time. */
+ private long writeTime;
+
+ /** User time. */
+ private long userTime;
+
+ /** Last timestamp. */
+ private long lastTs;
+
+ /** Amount of written bytes. */
+ private long total;
+
+ /**
+ * Creates light output stream.
+ *
+ * @param delegate Server stream delegate.
+ * @param log Logger to use.
+ * @param clientLog Client logger.
+ */
+ public HadoopOutputIgfsStream(HadoopIgfsStreamDelegate delegate, Log log,
+ IgfsLogger clientLog, long logStreamId) {
+ this.delegate = delegate;
+ this.log = log;
+ this.clientLog = clientLog;
+ this.logStreamId = logStreamId;
+
+ lastTs = System.nanoTime();
+
+ delegate.hadoop().addEventListener(delegate, this);
+ }
+
+ /**
+ * Read start.
+ */
+ private void writeStart() {
+ long now = System.nanoTime();
+
+ userTime += now - lastTs;
+
+ lastTs = now;
+ }
+
+ /**
+ * Read end.
+ */
+ private void writeEnd() {
+ long now = System.nanoTime();
+
+ writeTime += now - lastTs;
+
+ lastTs = now;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(@NotNull byte[] b, int off, int len) throws IOException {
+ check();
+
+ writeStart();
+
+ try {
+ delegate.hadoop().writeData(delegate, b, off, len);
+
+ total += len;
+ }
+ finally {
+ writeEnd();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(int b) throws IOException {
+ write(new byte[] {(byte)b});
+
+ total++;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flush() throws IOException {
+ delegate.hadoop().flush(delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ if (!closed) {
+ if (log.isDebugEnabled())
+ log.debug("Closing output stream: " + delegate);
+
+ writeStart();
+
+ delegate.hadoop().closeStream(delegate);
+
+ markClosed(false);
+
+ writeEnd();
+
+ if (clientLog.isLogEnabled())
+ clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
+
+ if (log.isDebugEnabled())
+ log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 +
+ ", userTime=" + userTime / 1000 + ']');
+ }
+ else if(connBroken)
+ throw new IOException(
+ "Failed to close stream, because connection was broken (data could have been lost).");
+ }
+
+ /**
+ * Marks stream as closed.
+ *
+ * @param connBroken {@code True} if connection with server was lost.
+ */
+ private void markClosed(boolean connBroken) {
+ // It is ok to have race here.
+ if (!closed) {
+ closed = true;
+
+ delegate.hadoop().removeEventListener(delegate);
+
+ this.connBroken = connBroken;
+ }
+ }
+
+ /**
+ * @throws IOException If check failed.
+ */
+ private void check() throws IOException {
+ String errMsg0 = errMsg;
+
+ if (errMsg0 != null)
+ throw new IOException(errMsg0);
+
+ if (closed) {
+ if (connBroken)
+ throw new IOException("Server connection was lost.");
+ else
+ throw new IOException("Stream is closed.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClose() throws IgniteCheckedException {
+ markClosed(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onError(String errMsg) {
+ this.errMsg = errMsg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java
deleted file mode 100644
index 27d6e33..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Facade for communication with grid.
- */
-public interface IgfsHadoop {
- /**
- * Perform handshake.
- *
- * @param logDir Log directory.
- * @return Future with handshake result.
- * @throws IgniteCheckedException If failed.
- */
- public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException;
-
- /**
- * Close connection.
- *
- * @param force Force flag.
- */
- public void close(boolean force);
-
- /**
- * Command to retrieve file info for some IGFS path.
- *
- * @param path Path to get file info for.
- * @return Future for info operation.
- * @throws IgniteCheckedException If failed.
- */
- public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException;
-
- /**
- * Command to update file properties.
- *
- * @param path IGFS path to update properties.
- * @param props Properties to update.
- * @return Future for update operation.
- * @throws IgniteCheckedException If failed.
- */
- public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
-
- /**
- * Sets last access time and last modification time for a file.
- *
- * @param path Path to update times.
- * @param accessTime Last access time to set.
- * @param modificationTime Last modification time to set.
- * @throws IgniteCheckedException If failed.
- */
- public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException,
- IOException;
-
- /**
- * Command to rename given path.
- *
- * @param src Source path.
- * @param dest Destination path.
- * @return Future for rename operation.
- * @throws IgniteCheckedException If failed.
- */
- public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException;
-
- /**
- * Command to delete given path.
- *
- * @param path Path to delete.
- * @param recursive {@code True} if deletion is recursive.
- * @return Future for delete operation.
- * @throws IgniteCheckedException If failed.
- */
- public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException;
-
- /**
- * Command to get affinity for given path, offset and length.
- *
- * @param path Path to get affinity for.
- * @param start Start position (offset).
- * @param len Data length.
- * @return Future for affinity command.
- * @throws IgniteCheckedException If failed.
- */
- public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException,
- IOException;
-
- /**
- * Gets path summary.
- *
- * @param path Path to get summary for.
- * @return Future that will be completed when summary is received.
- * @throws IgniteCheckedException If failed.
- */
- public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException;
-
- /**
- * Command to create directories.
- *
- * @param path Path to create.
- * @return Future for mkdirs operation.
- * @throws IgniteCheckedException If failed.
- */
- public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
-
- /**
- * Command to get list of files in directory.
- *
- * @param path Path to list.
- * @return Future for listFiles operation.
- * @throws IgniteCheckedException If failed.
- */
- public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException;
-
- /**
- * Command to get directory listing.
- *
- * @param path Path to list.
- * @return Future for listPaths operation.
- * @throws IgniteCheckedException If failed.
- */
- public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException;
-
- /**
- * Performs status request.
- *
- * @return Status response.
- * @throws IgniteCheckedException If failed.
- */
- public IgfsStatus fsStatus() throws IgniteCheckedException, IOException;
-
- /**
- * Command to open file for reading.
- *
- * @param path File path to open.
- * @return Future for open operation.
- * @throws IgniteCheckedException If failed.
- */
- public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException;
-
- /**
- * Command to open file for reading.
- *
- * @param path File path to open.
- * @return Future for open operation.
- * @throws IgniteCheckedException If failed.
- */
- public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException,
- IOException;
-
- /**
- * Command to create file and open it for output.
- *
- * @param path Path to file.
- * @param overwrite If {@code true} then old file contents will be lost.
- * @param colocate If {@code true} and called on data node, file will be written on that node.
- * @param replication Replication factor.
- * @param props File properties for creation.
- * @return Stream descriptor.
- * @throws IgniteCheckedException If failed.
- */
- public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
- int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
-
- /**
- * Open file for output appending data to the end of a file.
- *
- * @param path Path to file.
- * @param create If {@code true}, file will be created if does not exist.
- * @param props File properties.
- * @return Stream descriptor.
- * @throws IgniteCheckedException If failed.
- */
- public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create,
- @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java
deleted file mode 100644
index 03bf733..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.*;
-
-/**
- * Communication exception indicating a problem between file system and IGFS instance.
- */
-public class IgfsHadoopCommunicationException extends IgniteCheckedException {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Creates new exception with given throwable as a nested cause and
- * source of error message.
- *
- * @param cause Non-null throwable cause.
- */
- public IgfsHadoopCommunicationException(Exception cause) {
- super(cause);
- }
-
- /**
- * Creates a new exception with given error message and optional nested cause exception.
- *
- * @param msg Error message.
- */
- public IgfsHadoopCommunicationException(String msg) {
- super(msg);
- }
-
- /**
- * Creates a new exception with given error message and optional nested cause exception.
- *
- * @param msg Error message.
- * @param cause Cause.
- */
- public IgfsHadoopCommunicationException(String msg, Exception cause) {
- super(msg, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java
deleted file mode 100644
index 35638ea..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.IgfsConfiguration.*;
-
-/**
- * IGFS endpoint abstraction.
- */
-public class IgfsHadoopEndpoint {
- /** Localhost. */
- public static final String LOCALHOST = "127.0.0.1";
-
- /** IGFS name. */
- private final String igfsName;
-
- /** Grid name. */
- private final String gridName;
-
- /** Host. */
- private final String host;
-
- /** Port. */
- private final int port;
-
- /**
- * Normalize IGFS URI.
- *
- * @param uri URI.
- * @return Normalized URI.
- * @throws IOException If failed.
- */
- public static URI normalize(URI uri) throws IOException {
- try {
- if (!F.eq(IgniteFs.IGFS_SCHEME, uri.getScheme()))
- throw new IOException("Failed to normalize UIR because it has non IGFS scheme: " + uri);
-
- IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(uri.getAuthority());
-
- StringBuilder sb = new StringBuilder();
-
- if (endpoint.igfs() != null)
- sb.append(endpoint.igfs());
-
- if (endpoint.grid() != null)
- sb.append(":").append(endpoint.grid());
-
- return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(),
- uri.getPath(), uri.getQuery(), uri.getFragment());
- }
- catch (URISyntaxException | IgniteCheckedException e) {
- throw new IOException("Failed to normalize URI: " + uri, e);
- }
- }
-
- /**
- * Constructor.
- *
- * @param connStr Connection string.
- * @throws IgniteCheckedException If failed to parse connection string.
- */
- public IgfsHadoopEndpoint(@Nullable String connStr) throws IgniteCheckedException {
- if (connStr == null)
- connStr = "";
-
- String[] tokens = connStr.split("@", -1);
-
- IgniteBiTuple<String, Integer> hostPort;
-
- if (tokens.length == 1) {
- igfsName = null;
- gridName = null;
-
- hostPort = hostPort(connStr, connStr);
- }
- else if (tokens.length == 2) {
- String authStr = tokens[0];
-
- if (authStr.isEmpty()) {
- gridName = null;
- igfsName = null;
- }
- else {
- String[] authTokens = authStr.split(":", -1);
-
- igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0];
-
- if (authTokens.length == 1)
- gridName = null;
- else if (authTokens.length == 2)
- gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1];
- else
- throw new IgniteCheckedException("Invalid connection string format: " + connStr);
- }
-
- hostPort = hostPort(connStr, tokens[1]);
- }
- else
- throw new IgniteCheckedException("Invalid connection string format: " + connStr);
-
- host = hostPort.get1();
-
- assert hostPort.get2() != null;
-
- port = hostPort.get2();
- }
-
- /**
- * Parse host and port.
- *
- * @param connStr Full connection string.
- * @param hostPortStr Host/port connection string part.
- * @return Tuple with host and port.
- * @throws IgniteCheckedException If failed to parse connection string.
- */
- private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException {
- String[] tokens = hostPortStr.split(":", -1);
-
- String host = tokens[0];
-
- if (F.isEmpty(host))
- host = LOCALHOST;
-
- int port;
-
- if (tokens.length == 1)
- port = DFLT_IPC_PORT;
- else if (tokens.length == 2) {
- String portStr = tokens[1];
-
- try {
- port = Integer.valueOf(portStr);
-
- if (port < 0 || port > 65535)
- throw new IgniteCheckedException("Invalid port number: " + connStr);
- }
- catch (NumberFormatException e) {
- throw new IgniteCheckedException("Invalid port number: " + connStr);
- }
- }
- else
- throw new IgniteCheckedException("Invalid connection string format: " + connStr);
-
- return F.t(host, port);
- }
-
- /**
- * @return IGFS name.
- */
- @Nullable public String igfs() {
- return igfsName;
- }
-
- /**
- * @return Grid name.
- */
- @Nullable public String grid() {
- return gridName;
- }
-
- /**
- * @return Host.
- */
- public String host() {
- return host;
- }
-
- /**
- * @return Host.
- */
- public boolean isLocal() {
- return F.eq(LOCALHOST, host);
- }
-
- /**
- * @return Port.
- */
- public int port() {
- return port;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgfsHadoopEndpoint.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java
deleted file mode 100644
index da86e37..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Extended IGFS server interface.
- */
-public interface IgfsHadoopEx extends IgfsHadoop {
- /**
- * Adds event listener that will be invoked when connection with server is lost or remote error has occurred.
- * If connection is closed already, callback will be invoked synchronously inside this method.
- *
- * @param delegate Stream delegate.
- * @param lsnr Event listener.
- */
- public void addEventListener(IgfsHadoopStreamDelegate delegate, IgfsHadoopStreamEventListener lsnr);
-
- /**
- * Removes event listener that will be invoked when connection with server is lost or remote error has occurred.
- *
- * @param delegate Stream delegate.
- */
- public void removeEventListener(IgfsHadoopStreamDelegate delegate);
-
- /**
- * Asynchronously reads specified amount of bytes from opened input stream.
- *
- * @param delegate Stream delegate.
- * @param pos Position to read from.
- * @param len Data length to read.
- * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining
- * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will
- * be the result of read future.
- * @param outOff Output offset.
- * @param outLen Output length.
- * @return Read data.
- */
- public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len,
- @Nullable final byte[] outBuf, final int outOff, final int outLen);
-
- /**
- * Writes data to the stream with given streamId. This method does not return any future since
- * no response to write request is sent.
- *
- * @param delegate Stream delegate.
- * @param data Data to write.
- * @param off Offset.
- * @param len Length.
- * @throws IOException If failed.
- */
- public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len) throws IOException;
-
- /**
- * Close server stream.
- *
- * @param delegate Stream delegate.
- * @throws IOException If failed.
- */
- public void closeStream(IgfsHadoopStreamDelegate delegate) throws IOException;
-
- /**
- * Flush output stream.
- *
- * @param delegate Stream delegate.
- * @throws IOException If failed.
- */
- public void flush(IgfsHadoopStreamDelegate delegate) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
deleted file mode 100644
index c9d1322..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.hadoop.fs.permission.*;
-import org.apache.ignite.*;
-
-import java.util.*;
-
-import static org.apache.ignite.IgniteFs.*;
-
-/**
- * Hadoop file system properties.
- */
-public class IgfsHadoopFSProperties {
- /** Username. */
- private String usrName;
-
- /** Group name. */
- private String grpName;
-
- /** Permissions. */
- private FsPermission perm;
-
- /**
- * Constructor.
- *
- * @param props Properties.
- * @throws IgniteException In case of error.
- */
- public IgfsHadoopFSProperties(Map<String, String> props) throws IgniteException {
- usrName = props.get(PROP_USER_NAME);
- grpName = props.get(PROP_GROUP_NAME);
-
- String permStr = props.get(PROP_PERMISSION);
-
- if (permStr != null) {
- try {
- perm = new FsPermission((short)Integer.parseInt(permStr, 8));
- }
- catch (NumberFormatException ignore) {
- throw new IgniteException("Permissions cannot be parsed: " + permStr);
- }
- }
- }
-
- /**
- * Get user name.
- *
- * @return User name.
- */
- public String userName() {
- return usrName;
- }
-
- /**
- * Get group name.
- *
- * @return Group name.
- */
- public String groupName() {
- return grpName;
- }
-
- /**
- * Get permission.
- *
- * @return Permission.
- */
- public FsPermission permission() {
- return perm;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java
deleted file mode 100644
index 476641c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-/**
- * IGFS client future that holds response parse closure.
- */
-public class IgfsHadoopFuture<T> extends GridPlainFutureAdapter<T> {
- /** Output buffer. */
- private byte[] outBuf;
-
- /** Output offset. */
- private int outOff;
-
- /** Output length. */
- private int outLen;
-
- /** Read future flag. */
- private boolean read;
-
- /**
- * @return Output buffer.
- */
- public byte[] outputBuffer() {
- return outBuf;
- }
-
- /**
- * @param outBuf Output buffer.
- */
- public void outputBuffer(@Nullable byte[] outBuf) {
- this.outBuf = outBuf;
- }
-
- /**
- * @return Offset in output buffer to write from.
- */
- public int outputOffset() {
- return outOff;
- }
-
- /**
- * @param outOff Offset in output buffer to write from.
- */
- public void outputOffset(int outOff) {
- this.outOff = outOff;
- }
-
- /**
- * @return Length to write to output buffer.
- */
- public int outputLength() {
- return outLen;
- }
-
- /**
- * @param outLen Length to write to output buffer.
- */
- public void outputLength(int outLen) {
- this.outLen = outLen;
- }
-
- /**
- * @param read {@code True} if this is a read future.
- */
- public void read(boolean read) {
- this.read = read;
- }
-
- /**
- * @return {@code True} if this is a read future.
- */
- public boolean read() {
- return read;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java
deleted file mode 100644
index 8245125..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.commons.logging.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Communication with grid in the same process.
- */
-public class IgfsHadoopInProc implements IgfsHadoopEx {
- /** Target IGFS. */
- private final IgfsEx igfs;
-
- /** Buffer size. */
- private final int bufSize;
-
- /** Event listeners. */
- private final Map<IgfsHadoopStreamDelegate, IgfsHadoopStreamEventListener> lsnrs =
- new ConcurrentHashMap<>();
-
- /** Logger. */
- private final Log log;
-
- /**
- * Constructor.
- *
- * @param igfs Target IGFS.
- * @param log Log.
- */
- public IgfsHadoopInProc(IgfsEx igfs, Log log) {
- this.igfs = igfs;
- this.log = log;
-
- bufSize = igfs.configuration().getBlockSize() * 2;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHandshakeResponse handshake(String logDir) {
- igfs.clientLogDirectory(logDir);
-
- return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
- igfs.globalSampling());
- }
-
- /** {@inheritDoc} */
- @Override public void close(boolean force) {
- // Perform cleanup.
- for (IgfsHadoopStreamEventListener lsnr : lsnrs.values()) {
- try {
- lsnr.onClose();
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to notify stream event listener", e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
- try {
- return igfs.info(path);
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to get file info because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
- try {
- return igfs.update(path, props);
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to update file because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
- try {
- igfs.setTimes(path, accessTime, modificationTime);
-
- return true;
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to set path times because Grid is stopping: " +
- path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
- try {
- igfs.rename(src, dest);
-
- return true;
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to rename path because Grid is stopping: " + src);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
- try {
- return igfs.delete(path, recursive);
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to delete path because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
- try {
- return igfs.globalSpace();
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to get file system status because Grid is " +
- "stopping.");
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
- try {
- return igfs.listPaths(path);
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to list paths because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
- try {
- return igfs.listFiles(path);
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to list files because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
- try {
- igfs.mkdirs(path, props);
-
- return true;
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to create directory because Grid is stopping: " +
- path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
- try {
- return igfs.summary(path);
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to get content summary because Grid is stopping: " +
- path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
- throws IgniteCheckedException {
- try {
- return igfs.affinity(path, start, len);
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to get affinity because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
- try {
- IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
-
- return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length());
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch)
- throws IgniteCheckedException {
- try {
- IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
-
- return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length());
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
- int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
- try {
- IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
- colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
-
- return new IgfsHadoopStreamDelegate(this, stream);
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to create file because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create,
- @Nullable Map<String, String> props) throws IgniteCheckedException {
- try {
- IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
-
- return new IgfsHadoopStreamDelegate(this, stream);
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new IgfsHadoopCommunicationException("Failed to append file because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len,
- @Nullable byte[] outBuf, int outOff, int outLen) {
- IgfsInputStreamAdapter stream = delegate.target();
-
- try {
- byte[] res = null;
-
- if (outBuf != null) {
- int outTailLen = outBuf.length - outOff;
-
- if (len <= outTailLen)
- stream.readFully(pos, outBuf, outOff, len);
- else {
- stream.readFully(pos, outBuf, outOff, outTailLen);
-
- int remainderLen = len - outTailLen;
-
- res = new byte[remainderLen];
-
- stream.readFully(pos, res, 0, remainderLen);
- }
- } else {
- res = new byte[len];
-
- stream.readFully(pos, res, 0, len);
- }
-
- return new GridPlainFutureAdapter<>(res);
- }
- catch (IllegalStateException | IOException e) {
- IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate);
-
- if (lsnr != null)
- lsnr.onError(e.getMessage());
-
- return new GridPlainFutureAdapter<>(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len)
- throws IOException {
- try {
- IgfsOutputStream stream = delegate.target();
-
- stream.write(data, off, len);
- }
- catch (IllegalStateException | IOException e) {
- IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate);
-
- if (lsnr != null)
- lsnr.onError(e.getMessage());
-
- if (e instanceof IllegalStateException)
- throw new IOException("Failed to write data to IGFS stream because Grid is stopping.", e);
- else
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void flush(IgfsHadoopStreamDelegate delegate) throws IOException {
- try {
- IgfsOutputStream stream = delegate.target();
-
- stream.flush();
- }
- catch (IllegalStateException | IOException e) {
- IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate);
-
- if (lsnr != null)
- lsnr.onError(e.getMessage());
-
- if (e instanceof IllegalStateException)
- throw new IOException("Failed to flush data to IGFS stream because Grid is stopping.", e);
- else
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void closeStream(IgfsHadoopStreamDelegate desc) throws IOException {
- Closeable closeable = desc.target();
-
- try {
- closeable.close();
- }
- catch (IllegalStateException e) {
- throw new IOException("Failed to close IGFS stream because Grid is stopping.", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void addEventListener(IgfsHadoopStreamDelegate delegate,
- IgfsHadoopStreamEventListener lsnr) {
- IgfsHadoopStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr);
-
- assert lsnr0 == null || lsnr0 == lsnr;
-
- if (log.isDebugEnabled())
- log.debug("Added stream event listener [delegate=" + delegate + ']');
- }
-
- /** {@inheritDoc} */
- @Override public void removeEventListener(IgfsHadoopStreamDelegate delegate) {
- IgfsHadoopStreamEventListener lsnr0 = lsnrs.remove(delegate);
-
- if (lsnr0 != null && log.isDebugEnabled())
- log.debug("Removed stream event listener [delegate=" + delegate + ']');
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java
deleted file mode 100644
index efc5264..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java
+++ /dev/null
@@ -1,626 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.fs.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.igfs.common.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * IGFS input stream wrapper for hadoop interfaces.
- */
-@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-public final class IgfsHadoopInputStream extends InputStream implements Seekable, PositionedReadable,
- IgfsHadoopStreamEventListener {
- /** Minimum buffer size. */
- private static final int MIN_BUF_SIZE = 4 * 1024;
-
- /** Server stream delegate. */
- private IgfsHadoopStreamDelegate delegate;
-
- /** Stream ID used by logger. */
- private long logStreamId;
-
- /** Stream position. */
- private long pos;
-
- /** Stream read limit. */
- private long limit;
-
- /** Mark position. */
- private long markPos = -1;
-
- /** Prefetch buffer. */
- private DoubleFetchBuffer buf = new DoubleFetchBuffer();
-
- /** Buffer half size for double-buffering. */
- private int bufHalfSize;
-
- /** Closed flag. */
- private volatile boolean closed;
-
- /** Flag set if stream was closed due to connection breakage. */
- private boolean connBroken;
-
- /** Logger. */
- private Log log;
-
- /** Client logger. */
- private IgfsLogger clientLog;
-
- /** Read time. */
- private long readTime;
-
- /** User time. */
- private long userTime;
-
- /** Last timestamp. */
- private long lastTs;
-
- /** Amount of read bytes. */
- private long total;
-
- /**
- * Creates input stream.
- *
- * @param delegate Server stream delegate.
- * @param limit Read limit.
- * @param bufSize Buffer size.
- * @param log Log.
- * @param clientLog Client logger.
- */
- public IgfsHadoopInputStream(IgfsHadoopStreamDelegate delegate, long limit, int bufSize, Log log,
- IgfsLogger clientLog, long logStreamId) {
- assert limit >= 0;
-
- this.delegate = delegate;
- this.limit = limit;
- this.log = log;
- this.clientLog = clientLog;
- this.logStreamId = logStreamId;
-
- bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE);
-
- lastTs = System.nanoTime();
-
- delegate.hadoop().addEventListener(delegate, this);
- }
-
- /**
- * Read start.
- */
- private void readStart() {
- long now = System.nanoTime();
-
- userTime += now - lastTs;
-
- lastTs = now;
- }
-
- /**
- * Read end.
- */
- private void readEnd() {
- long now = System.nanoTime();
-
- readTime += now - lastTs;
-
- lastTs = now;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read() throws IOException {
- checkClosed();
-
- readStart();
-
- try {
- if (eof())
- return -1;
-
- buf.refreshAhead(pos);
-
- int res = buf.atPosition(pos);
-
- pos++;
- total++;
-
- buf.refreshAhead(pos);
-
- return res;
- }
- catch (IgniteCheckedException e) {
- throw IgfsHadoopUtils.cast(e);
- }
- finally {
- readEnd();
- }
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException {
- checkClosed();
-
- if (eof())
- return -1;
-
- readStart();
-
- try {
- long remaining = limit - pos;
-
- int read = buf.flatten(b, pos, off, len);
-
- pos += read;
- total += read;
- remaining -= read;
-
- if (remaining > 0 && read != len) {
- int readAmt = (int)Math.min(remaining, len - read);
-
- delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get();
-
- read += readAmt;
- pos += readAmt;
- total += readAmt;
- }
-
- buf.refreshAhead(pos);
-
- return read;
- }
- catch (IgniteCheckedException e) {
- throw IgfsHadoopUtils.cast(e);
- }
- finally {
- readEnd();
- }
- }
-
- /** {@inheritDoc} */
- @Override public synchronized long skip(long n) throws IOException {
- checkClosed();
-
- if (clientLog.isLogEnabled())
- clientLog.logSkip(logStreamId, n);
-
- long oldPos = pos;
-
- if (pos + n <= limit)
- pos += n;
- else
- pos = limit;
-
- buf.refreshAhead(pos);
-
- return pos - oldPos;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int available() throws IOException {
- checkClosed();
-
- int available = buf.available(pos);
-
- assert available >= 0;
-
- return available;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void close() throws IOException {
- if (!closed) {
- readStart();
-
- if (log.isDebugEnabled())
- log.debug("Closing input stream: " + delegate);
-
- delegate.hadoop().closeStream(delegate);
-
- readEnd();
-
- if (clientLog.isLogEnabled())
- clientLog.logCloseIn(logStreamId, userTime, readTime, total);
-
- markClosed(false);
-
- if (log.isDebugEnabled())
- log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime +
- ", userTime=" + userTime + ']');
- }
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void mark(int readLimit) {
- markPos = pos;
-
- if (clientLog.isLogEnabled())
- clientLog.logMark(logStreamId, readLimit);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void reset() throws IOException {
- checkClosed();
-
- if (clientLog.isLogEnabled())
- clientLog.logReset(logStreamId);
-
- if (markPos == -1)
- throw new IOException("Stream was not marked.");
-
- pos = markPos;
-
- buf.refreshAhead(pos);
- }
-
- /** {@inheritDoc} */
- @Override public boolean markSupported() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException {
- long remaining = limit - position;
-
- int read = (int)Math.min(len, remaining);
-
- // Return -1 at EOF.
- if (read == 0)
- return -1;
-
- readFully(position, buf, off, read);
-
- return read;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException {
- long remaining = limit - position;
-
- checkClosed();
-
- if (len > remaining)
- throw new EOFException("End of stream reached before data was fully read.");
-
- readStart();
-
- try {
- int read = this.buf.flatten(buf, position, off, len);
-
- total += read;
-
- if (read != len) {
- int readAmt = len - read;
-
- delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get();
-
- total += readAmt;
- }
-
- if (clientLog.isLogEnabled())
- clientLog.logRandomRead(logStreamId, position, len);
- }
- catch (IgniteCheckedException e) {
- throw IgfsHadoopUtils.cast(e);
- }
- finally {
- readEnd();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(long position, byte[] buf) throws IOException {
- readFully(position, buf, 0, buf.length);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void seek(long pos) throws IOException {
- A.ensure(pos >= 0, "position must be non-negative");
-
- checkClosed();
-
- if (clientLog.isLogEnabled())
- clientLog.logSeek(logStreamId, pos);
-
- if (pos > limit)
- pos = limit;
-
- if (log.isDebugEnabled())
- log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']');
-
- this.pos = pos;
-
- buf.refreshAhead(pos);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized long getPos() {
- return pos;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized boolean seekToNewSource(long targetPos) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void onClose() {
- markClosed(true);
- }
-
- /** {@inheritDoc} */
- @Override public void onError(String errMsg) {
- // No-op.
- }
-
- /**
- * Marks stream as closed.
- *
- * @param connBroken {@code True} if connection with server was lost.
- */
- private void markClosed(boolean connBroken) {
- // It is ok to have race here.
- if (!closed) {
- closed = true;
-
- this.connBroken = connBroken;
-
- delegate.hadoop().removeEventListener(delegate);
- }
- }
-
- /**
- * @throws IOException If check failed.
- */
- private void checkClosed() throws IOException {
- if (closed) {
- if (connBroken)
- throw new IOException("Server connection was lost.");
- else
- throw new IOException("Stream is closed.");
- }
- }
-
- /**
- * @return {@code True} if end of stream reached.
- */
- private boolean eof() {
- return limit == pos;
- }
-
- /**
- * Asynchronous prefetch buffer.
- */
- private static class FetchBufferPart {
- /** Read future. */
- private GridPlainFuture<byte[]> readFut;
-
- /** Position of cached chunk in file. */
- private long pos;
-
- /** Prefetch length. Need to store as read future result might be not available yet. */
- private int len;
-
- /**
- * Creates fetch buffer part.
- *
- * @param readFut Read future for this buffer.
- * @param pos Read position.
- * @param len Chunk length.
- */
- private FetchBufferPart(GridPlainFuture<byte[]> readFut, long pos, int len) {
- this.readFut = readFut;
- this.pos = pos;
- this.len = len;
- }
-
- /**
- * Copies cached data if specified position matches cached region.
- *
- * @param dst Destination buffer.
- * @param pos Read position in file.
- * @param dstOff Offset in destination buffer from which start writing.
- * @param len Maximum number of bytes to copy.
- * @return Number of bytes copied.
- * @throws IgniteCheckedException If read future failed.
- */
- public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException {
- // If read start position is within cached boundaries.
- if (contains(pos)) {
- byte[] data = readFut.get();
-
- int srcPos = (int)(pos - this.pos);
- int cpLen = Math.min(len, data.length - srcPos);
-
- U.arrayCopy(data, srcPos, dst, dstOff, cpLen);
-
- return cpLen;
- }
-
- return 0;
- }
-
- /**
- * @return {@code True} if data is ready to be read.
- */
- public boolean ready() {
- return readFut.isDone();
- }
-
- /**
- * Checks if current buffer part contains given position.
- *
- * @param pos Position to check.
- * @return {@code True} if position matches buffer region.
- */
- public boolean contains(long pos) {
- return this.pos <= pos && this.pos + len > pos;
- }
- }
-
- private class DoubleFetchBuffer {
- /** */
- private FetchBufferPart first;
-
- /** */
- private FetchBufferPart second;
-
- /**
- * Copies fetched data from both buffers to destination array if cached region matched read position.
- *
- * @param dst Destination buffer.
- * @param pos Read position in file.
- * @param dstOff Destination buffer offset.
- * @param len Maximum number of bytes to copy.
- * @return Number of bytes copied.
- * @throws IgniteCheckedException If any read operation failed.
- */
- public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException {
- assert dstOff >= 0;
- assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff +
- ", len=" + len + ']';
-
- int bytesCopied = 0;
-
- if (first != null) {
- bytesCopied += first.flatten(dst, pos, dstOff, len);
-
- if (bytesCopied != len && second != null) {
- assert second.pos == first.pos + first.len;
-
- bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied);
- }
- }
-
- return bytesCopied;
- }
-
- /**
- * Gets byte at specified position in buffer.
- *
- * @param pos Stream position.
- * @return Read byte.
- * @throws IgniteCheckedException If read failed.
- */
- public int atPosition(long pos) throws IgniteCheckedException {
- // Should not reach here if stream contains no data.
- assert first != null;
-
- if (first.contains(pos)) {
- byte[] bytes = first.readFut.get();
-
- return bytes[((int)(pos - first.pos))] & 0xFF;
- }
- else {
- assert second != null;
- assert second.contains(pos);
-
- byte[] bytes = second.readFut.get();
-
- return bytes[((int)(pos - second.pos))] & 0xFF;
- }
- }
-
- /**
- * Starts asynchronous buffer refresh if needed, depending on current position.
- *
- * @param pos Current stream position.
- */
- public void refreshAhead(long pos) {
- if (fullPrefetch(pos)) {
- first = fetch(pos, bufHalfSize);
- second = fetch(pos + bufHalfSize, bufHalfSize);
- }
- else if (needFlip(pos)) {
- first = second;
-
- second = fetch(first.pos + first.len, bufHalfSize);
- }
- }
-
- /**
- * @param pos Position from which read is expected.
- * @return Number of bytes available to be read without blocking.
- */
- public int available(long pos) {
- int available = 0;
-
- if (first != null) {
- if (first.contains(pos)) {
- if (first.ready()) {
- available += (pos - first.pos);
-
- if (second != null && second.ready())
- available += second.len;
- }
- }
- else {
- if (second != null && second.contains(pos) && second.ready())
- available += (pos - second.pos);
- }
- }
-
- return available;
- }
-
- /**
- * Checks if position shifted enough to forget previous buffer.
- *
- * @param pos Current position.
- * @return {@code True} if need flip buffers.
- */
- private boolean needFlip(long pos) {
- // Return true if we read more then half of second buffer.
- return second != null && second.contains(pos);
- }
-
- /**
- * Determines if all cached bytes should be discarded and new region should be
- * prefetched.
- *
- * @param curPos Current stream position.
- * @return {@code True} if need to refresh both blocks.
- */
- private boolean fullPrefetch(long curPos) {
- // If no data was prefetched yet, return true.
- return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len);
- }
-
- /**
- * Starts asynchronous fetch for given region.
- *
- * @param pos Position to read from.
- * @param size Number of bytes to read.
- * @return Fetch buffer part.
- */
- private FetchBufferPart fetch(long pos, int size) {
- long remaining = limit - pos;
-
- size = (int)Math.min(size, remaining);
-
- return size <= 0 ? null :
- new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java
deleted file mode 100644
index 46f5a6c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.igfs.common.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-/**
- * IO abstraction layer for IGFS client. Two kind of messages are expected to be sent: requests with response
- * and request without response.
- */
-public interface IgfsHadoopIo {
- /**
- * Sends given IGFS client message and asynchronously awaits for response.
- *
- * @param msg Message to send.
- * @return Future that will be completed.
- * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed).
- */
- public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException;
-
- /**
- * Sends given IGFS client message and asynchronously awaits for response. When IO detects response
- * beginning for given message it stops reading data and passes input stream to closure which can read
- * response in a specific way.
- *
- * @param msg Message to send.
- * @param outBuf Output buffer. If {@code null}, the output buffer is not used.
- * @param outOff Output buffer offset.
- * @param outLen Output buffer length.
- * @return Future that will be completed when response is returned from closure.
- * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed).
- */
- public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen)
- throws IgniteCheckedException;
-
- /**
- * Sends given message and does not wait for response.
- *
- * @param msg Message to send.
- * @throws IgniteCheckedException If send failed.
- */
- public void sendPlain(IgfsMessage msg) throws IgniteCheckedException;
-
- /**
- * Adds event listener that will be invoked when connection with server is lost or remote error has occurred.
- * If connection is closed already, callback will be invoked synchronously inside this method.
- *
- * @param lsnr Event listener.
- */
- public void addEventListener(IgfsHadoopIpcIoListener lsnr);
-
- /**
- * Removes event listener that will be invoked when connection with server is lost or remote error has occurred.
- *
- * @param lsnr Event listener.
- */
- public void removeEventListener(IgfsHadoopIpcIoListener lsnr);
-}