You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 14:22:05 UTC
[03/12] ignite git commit: IGNITE-1477: Fixed.
IGNITE-1477: Fixed.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c0e1ac18
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c0e1ac18
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c0e1ac18
Branch: refs/heads/master
Commit: c0e1ac1842df19a4de83dbbcc99090b43371c913
Parents: c065512
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 15 10:25:09 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 15 10:25:09 2015 +0300
----------------------------------------------------------------------
.../hadoop/igfs/HadoopIgfsWrapper.java | 94 +++++++++++---------
1 file changed, 53 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c0e1ac18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
index abbb142..01189f7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
import org.apache.ignite.internal.processors.igfs.IgfsStatus;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -339,7 +340,9 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
* @return Delegate.
*/
private Delegate delegate() throws HadoopIgfsCommunicationException {
- Exception err = null;
+ // These fields will contain possible exceptions from shmem and TCP endpoints.
+ Exception errShmem = null;
+ Exception errTcp = null;
// 1. If delegate is set, return it immediately.
Delegate curDelegate = delegateRef.get();
@@ -357,8 +360,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs());
}
- catch (Exception e) {
- err = e;
+ catch (Exception ignore) {
+ // No-op.
}
}
else {
@@ -368,8 +371,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
break;
}
- catch (Exception e) {
- err = e;
+ catch (Exception ignore) {
+ // No-op.
}
}
}
@@ -388,57 +391,54 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
hadoop.close(true);
if (log.isDebugEnabled())
- log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e);
-
- err = e;
+ log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e);
}
}
}
// 3. Try connecting using shmem.
- if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false)) {
- if (curDelegate == null && !U.isWindows()) {
- HadoopIgfsEx hadoop = null;
+ boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false);
- try {
- hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName);
+ if (curDelegate == null && !skipLocShmem && !U.isWindows()) {
+ HadoopIgfsEx hadoop = null;
- curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
- }
- catch (IOException | IgniteCheckedException e) {
- if (e instanceof HadoopIgfsCommunicationException)
- hadoop.close(true);
+ try {
+ hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName);
- if (log.isDebugEnabled())
- log.debug("Failed to connect to out-proc local IGFS using shmem.", e);
+ curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (e instanceof HadoopIgfsCommunicationException)
+ hadoop.close(true);
- err = e;
- }
+ if (log.isDebugEnabled())
+ log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e);
+
+ errShmem = e;
}
}
// 4. Try local TCP connection.
boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
- if (!skipLocTcp) {
- if (curDelegate == null) {
- HadoopIgfsEx hadoop = null;
+ if (curDelegate == null && !skipLocTcp) {
+ HadoopIgfsEx hadoop = null;
- try {
- hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
- log, userName);
+ try {
+ hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
+ log, userName);
- curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
- }
- catch (IOException | IgniteCheckedException e) {
- if (e instanceof HadoopIgfsCommunicationException)
- hadoop.close(true);
+ curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (e instanceof HadoopIgfsCommunicationException)
+ hadoop.close(true);
- if (log.isDebugEnabled())
- log.debug("Failed to connect to out-proc local IGFS using TCP.", e);
+ if (log.isDebugEnabled())
+ log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
+ ", port=" + endpoint.port() + ']', e);
- err = e;
- }
+ errTcp = e;
}
}
@@ -457,9 +457,10 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
hadoop.close(true);
if (log.isDebugEnabled())
- log.debug("Failed to connect to out-proc remote IGFS using TCP.", e);
+ log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
+ ", port=" + endpoint.port() + ']', e);
- err = e;
+ errTcp = e;
}
}
@@ -469,8 +470,19 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
return curDelegate;
}
- else
- throw new HadoopIgfsCommunicationException("Failed to connect to IGFS: " + endpoint, err);
+ else {
+ SB errMsg = new SB("Failed to connect to IGFS [endpoint=" + authority + ", attempts=[");
+
+ if (errShmem != null)
+ errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], ");
+
+ errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] ");
+
+ errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " +
+ "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint).");
+
+ throw new HadoopIgfsCommunicationException(errMsg.toString());
+ }
}
/**