You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/03/05 10:06:45 UTC
[34/51] incubator-ignite git commit: IGNITE-386: Squashed changes.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java
deleted file mode 100644
index 9aaab4c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-/**
- * IGFS Hadoop stream descriptor.
- */
-public class IgfsHadoopStreamDelegate {
- /** RPC handler. */
- private final IgfsHadoopEx hadoop;
-
- /** Target. */
- private final Object target;
-
- /** Optional stream length. */
- private final long len;
-
- /**
- * Constructor.
- *
- * @param target Target.
- */
- public IgfsHadoopStreamDelegate(IgfsHadoopEx hadoop, Object target) {
- this(hadoop, target, -1);
- }
-
- /**
- * Constructor.
- *
- * @param target Target.
- * @param len Optional length.
- */
- public IgfsHadoopStreamDelegate(IgfsHadoopEx hadoop, Object target, long len) {
- assert hadoop != null;
- assert target != null;
-
- this.hadoop = hadoop;
- this.target = target;
- this.len = len;
- }
-
- /**
- * @return RPC handler.
- */
- public IgfsHadoopEx hadoop() {
- return hadoop;
- }
-
- /**
- * @return Stream target.
- */
- @SuppressWarnings("unchecked")
- public <T> T target() {
- return (T) target;
- }
-
- /**
- * @return Length.
- */
- public long length() {
- return len;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return System.identityHashCode(target);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- return obj != null && obj instanceof IgfsHadoopStreamDelegate &&
- target == ((IgfsHadoopStreamDelegate)obj).target;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgfsHadoopStreamDelegate.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java
deleted file mode 100644
index 20d7f2a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.*;
-
-/**
- * IGFS input stream event listener.
- */
-public interface IgfsHadoopStreamEventListener {
- /**
- * Callback invoked when the stream is being closed.
- *
- * @throws IgniteCheckedException If failed.
- */
- public void onClose() throws IgniteCheckedException;
-
- /**
- * Callback invoked when remote error occurs.
- *
- * @param errMsg Error message.
- */
- public void onError(String errMsg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java
deleted file mode 100644
index bd96e60..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Utility constants and methods for IGFS Hadoop file system.
- */
-public class IgfsHadoopUtils {
- /** Parameter name for endpoint no embed mode flag. */
- public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed";
-
- /** Parameter name for endpoint no shared memory flag. */
- public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem";
-
- /** Parameter name for endpoint no local TCP flag. */
- public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp";
-
- /**
- * Get string parameter.
- *
- * @param cfg Configuration.
- * @param name Parameter name.
- * @param authority Authority.
- * @param dflt Default value.
- * @return String value.
- */
- public static String parameter(Configuration cfg, String name, String authority, String dflt) {
- return cfg.get(String.format(name, authority != null ? authority : ""), dflt);
- }
-
- /**
- * Get integer parameter.
- *
- * @param cfg Configuration.
- * @param name Parameter name.
- * @param authority Authority.
- * @param dflt Default value.
- * @return Integer value.
- * @throws IOException In case of parse exception.
- */
- public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException {
- String name0 = String.format(name, authority != null ? authority : "");
-
- try {
- return cfg.getInt(name0, dflt);
- }
- catch (NumberFormatException ignore) {
- throw new IOException("Failed to parse parameter value to integer: " + name0);
- }
- }
-
- /**
- * Get boolean parameter.
- *
- * @param cfg Configuration.
- * @param name Parameter name.
- * @param authority Authority.
- * @param dflt Default value.
- * @return Boolean value.
- */
- public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) {
- return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt);
- }
-
- /**
- * Cast Ignite exception to appropriate IO exception.
- *
- * @param e Exception to cast.
- * @return Casted exception.
- */
- public static IOException cast(IgniteCheckedException e) {
- return cast(e, null);
- }
-
- /**
- * Cast Ignite exception to appropriate IO exception.
- *
- * @param e Exception to cast.
- * @param path Path for exceptions.
- * @return Casted exception.
- */
- @SuppressWarnings("unchecked")
- public static IOException cast(IgniteCheckedException e, @Nullable String path) {
- assert e != null;
-
- // First check for any nested IOException; if exists - re-throw it.
- if (e.hasCause(IOException.class))
- return e.getCause(IOException.class);
- else if (e.hasCause(IgfsFileNotFoundException.class))
- return new FileNotFoundException(path); // TODO: Or PathNotFoundException?
- else if (e.hasCause(IgfsParentNotDirectoryException.class))
- return new ParentNotDirectoryException(path);
- else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class))
- return new PathIsNotEmptyDirectoryException(path);
- else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class))
- return new PathExistsException(path);
- else
- return new IOException(e);
- }
-
- /**
- * Constructor.
- */
- private IgfsHadoopUtils() {
- // No-op.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java
deleted file mode 100644
index 5586e72..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.conf.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopEndpoint.*;
-import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
-
-/**
- * Wrapper for IGFS server.
- */
-public class IgfsHadoopWrapper implements IgfsHadoop {
- /** Delegate. */
- private final AtomicReference<Delegate> delegateRef = new AtomicReference<>();
-
- /** Authority. */
- private final String authority;
-
- /** Connection string. */
- private final IgfsHadoopEndpoint endpoint;
-
- /** Log directory. */
- private final String logDir;
-
- /** Configuration. */
- private final Configuration conf;
-
- /** Logger. */
- private final Log log;
-
- /**
- * Constructor.
- *
- * @param authority Authority (connection string).
- * @param logDir Log directory for server.
- * @param conf Configuration.
- * @param log Current logger.
- */
- public IgfsHadoopWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException {
- try {
- this.authority = authority;
- this.endpoint = new IgfsHadoopEndpoint(authority);
- this.logDir = logDir;
- this.conf = conf;
- this.log = log;
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to parse endpoint: " + authority, e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() {
- @Override public IgfsHandshakeResponse apply(IgfsHadoopEx hadoop,
- IgfsHandshakeResponse hndResp) {
- return hndResp;
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public void close(boolean force) {
- Delegate delegate = delegateRef.get();
-
- if (delegate != null && delegateRef.compareAndSet(delegate, null))
- delegate.close(force);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile info(final IgfsPath path) throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
- @Override public IgfsFile apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.info(path);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
- @Override public IgfsFile apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.update(path, props);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime)
- throws IOException {
- return withReconnectHandling(new FileSystemClosure<Boolean>() {
- @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.setTimes(path, accessTime, modificationTime);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Boolean>() {
- @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.rename(src, dest);
- }
- }, src);
- }
-
- /** {@inheritDoc} */
- @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Boolean>() {
- @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.delete(path, recursive);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start,
- final long len) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() {
- @Override public Collection<IgfsBlockLocation> apply(IgfsHadoopEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.affinity(path, start, len);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() {
- @Override public IgfsPathSummary apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.contentSummary(path);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Boolean>() {
- @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.mkdirs(path, props);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() {
- @Override public Collection<IgfsFile> apply(IgfsHadoopEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.listFiles(path);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() {
- @Override public Collection<IgfsPath> apply(IgfsHadoopEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.listPaths(path);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsStatus fsStatus() throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsStatus>() {
- @Override public IgfsStatus apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.fsStatus();
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHadoopStreamDelegate open(final IgfsPath path) throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() {
- @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.open(path);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHadoopStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
- throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() {
- @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.open(path, seqReadsBeforePrefetch);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHadoopStreamDelegate create(final IgfsPath path, final boolean overwrite,
- final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props)
- throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() {
- @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.create(path, overwrite, colocate, replication, blockSize, props);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHadoopStreamDelegate append(final IgfsPath path, final boolean create,
- @Nullable final Map<String, String> props) throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() {
- @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.append(path, create, props);
- }
- }, path);
- }
-
- /**
- * Execute closure which is not path-specific.
- *
- * @param clo Closure.
- * @return Result.
- * @throws IOException If failed.
- */
- private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException {
- return withReconnectHandling(clo, null);
- }
-
- /**
- * Execute closure.
- *
- * @param clo Closure.
- * @param path Path for exceptions.
- * @return Result.
- * @throws IOException If failed.
- */
- private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path)
- throws IOException {
- Exception err = null;
-
- for (int i = 0; i < 2; i++) {
- Delegate curDelegate = null;
-
- boolean close = false;
- boolean force = false;
-
- try {
- curDelegate = delegate();
-
- assert curDelegate != null;
-
- close = curDelegate.doomed;
-
- return clo.apply(curDelegate.hadoop, curDelegate.hndResp);
- }
- catch (IgfsHadoopCommunicationException e) {
- if (curDelegate != null && !curDelegate.doomed) {
- // Try getting rid fo faulty delegate ASAP.
- delegateRef.compareAndSet(curDelegate, null);
-
- close = true;
- force = true;
- }
-
- if (log.isDebugEnabled())
- log.debug("Failed to send message to a server: " + e);
-
- err = e;
- }
- catch (IgniteCheckedException e) {
- throw IgfsHadoopUtils.cast(e, path != null ? path.toString() : null);
- }
- finally {
- if (close) {
- assert curDelegate != null;
-
- curDelegate.close(force);
- }
- }
- }
-
- throw new IOException("Failed to communicate with IGFS.", err);
- }
-
- /**
- * Get delegate creating it if needed.
- *
- * @return Delegate.
- */
- private Delegate delegate() throws IgfsHadoopCommunicationException {
- Exception err = null;
-
- // 1. If delegate is set, return it immediately.
- Delegate curDelegate = delegateRef.get();
-
- if (curDelegate != null)
- return curDelegate;
-
- // 2. Guess that we are in the same VM.
- if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false)) {
- IgfsEx igfs = null;
-
- if (endpoint.grid() == null) {
- try {
- Ignite ignite = G.ignite();
-
- igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs());
- }
- catch (Exception e) {
- err = e;
- }
- }
- else {
- for (Ignite ignite : G.allGrids()) {
- try {
- igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs());
-
- break;
- }
- catch (Exception e) {
- err = e;
- }
- }
- }
-
- if (igfs != null) {
- IgfsHadoopEx hadoop = null;
-
- try {
- hadoop = new IgfsHadoopInProc(igfs, log);
-
- curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
- }
- catch (IOException | IgniteCheckedException e) {
- if (e instanceof IgfsHadoopCommunicationException)
- hadoop.close(true);
-
- if (log.isDebugEnabled())
- log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e);
-
- err = e;
- }
- }
- }
-
- // 3. Try connecting using shmem.
- if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false)) {
- if (curDelegate == null && !U.isWindows()) {
- IgfsHadoopEx hadoop = null;
-
- try {
- hadoop = new IgfsHadoopOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
-
- curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
- }
- catch (IOException | IgniteCheckedException e) {
- if (e instanceof IgfsHadoopCommunicationException)
- hadoop.close(true);
-
- if (log.isDebugEnabled())
- log.debug("Failed to connect to out-proc local IGFS using shmem.", e);
-
- err = e;
- }
- }
- }
-
- // 4. Try local TCP connection.
- boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
-
- if (!skipLocTcp) {
- if (curDelegate == null) {
- IgfsHadoopEx hadoop = null;
-
- try {
- hadoop = new IgfsHadoopOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
- log);
-
- curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
- }
- catch (IOException | IgniteCheckedException e) {
- if (e instanceof IgfsHadoopCommunicationException)
- hadoop.close(true);
-
- if (log.isDebugEnabled())
- log.debug("Failed to connect to out-proc local IGFS using TCP.", e);
-
- err = e;
- }
- }
- }
-
- // 5. Try remote TCP connection.
- if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) {
- IgfsHadoopEx hadoop = null;
-
- try {
- hadoop = new IgfsHadoopOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
-
- curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
- }
- catch (IOException | IgniteCheckedException e) {
- if (e instanceof IgfsHadoopCommunicationException)
- hadoop.close(true);
-
- if (log.isDebugEnabled())
- log.debug("Failed to connect to out-proc remote IGFS using TCP.", e);
-
- err = e;
- }
- }
-
- if (curDelegate != null) {
- if (!delegateRef.compareAndSet(null, curDelegate))
- curDelegate.doomed = true;
-
- return curDelegate;
- }
- else
- throw new IgfsHadoopCommunicationException("Failed to connect to IGFS: " + endpoint, err);
- }
-
- /**
- * File system operation closure.
- */
- private static interface FileSystemClosure<T> {
- /**
- * Call closure body.
- *
- * @param hadoop RPC handler.
- * @param hndResp Handshake response.
- * @return Result.
- * @throws IgniteCheckedException If failed.
- * @throws IOException If failed.
- */
- public T apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException;
- }
-
- /**
- * Delegate.
- */
- private static class Delegate {
- /** RPC handler. */
- private final IgfsHadoopEx hadoop;
-
- /** Handshake request. */
- private final IgfsHandshakeResponse hndResp;
-
- /** Close guard. */
- private final AtomicBoolean closeGuard = new AtomicBoolean();
-
- /** Whether this delegate must be closed at the end of the next invocation. */
- private boolean doomed;
-
- /**
- * Constructor.
- *
- * @param hadoop Hadoop.
- * @param hndResp Handshake response.
- */
- private Delegate(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) {
- this.hadoop = hadoop;
- this.hndResp = hndResp;
- }
-
- /**
- * Close underlying RPC handler.
- *
- * @param force Force flag.
- */
- private void close(boolean force) {
- if (closeGuard.compareAndSet(false, true))
- hadoop.close(force);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html
deleted file mode 100644
index ec380f2..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
- <!-- Package description. -->
- Contains IGFS client classes.
-</body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html
deleted file mode 100644
index 4b070d3..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
- <!-- Package description. -->
- Contains IGFS client and common classes.
-</body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java
deleted file mode 100644
index bc4c0bb..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-import org.objectweb.asm.*;
-import org.objectweb.asm.commons.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Class loader allowing explicitly load classes without delegation to parent class loader.
- * Also supports class parsing for finding dependencies which contain transitive dependencies
- * unavailable for parent.
- */
-public class GridHadoopClassLoader extends URLClassLoader {
- /**
- * We are very parallel capable.
- */
- static {
- registerAsParallelCapable();
- }
-
- /** */
- private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)GridHadoopClassLoader.class.getClassLoader();
-
- /** */
- private static final Collection<URL> appJars = F.asList(APP_CLS_LDR.getURLs());
-
- /** */
- private static volatile Collection<URL> hadoopJars;
-
- /** */
- private static final Map<String, Boolean> cache = new ConcurrentHashMap8<>();
-
- /** */
- private static final Map<String, byte[]> bytesCache = new ConcurrentHashMap8<>();
-
- /**
- * @param urls Urls.
- */
- public GridHadoopClassLoader(URL[] urls) {
- super(addHadoopUrls(urls), APP_CLS_LDR);
-
- assert !(getParent() instanceof GridHadoopClassLoader);
- }
-
- /**
- * Need to parse only Ignite Hadoop and IGFS classes.
- *
- * @param cls Class name.
- * @return {@code true} if we need to check this class.
- */
- private static boolean isIgfsHadoop(String cls) {
- String ignitePackagePrefix = "org.apache.ignite";
- int len = ignitePackagePrefix.length();
-
- return cls.startsWith(ignitePackagePrefix) && (cls.indexOf("igfs.", len) != -1 || cls.indexOf(".fs.", len) != -1 || cls.indexOf("hadoop.", len) != -1);
- }
-
- /**
- * @param cls Class name.
- * @return {@code true} If this is Hadoop class.
- */
- private static boolean isHadoop(String cls) {
- return cls.startsWith("org.apache.hadoop.");
- }
-
- /** {@inheritDoc} */
- @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
- try {
- if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath.
- if (name.endsWith(".util.ShutdownHookManager")) // Dirty hack to get rid of Hadoop shutdown hooks.
- return loadFromBytes(name, GridHadoopShutdownHookManager.class.getName());
- else if (name.endsWith(".util.NativeCodeLoader"))
- return loadFromBytes(name, GridHadoopNativeCodeLoader.class.getName());
-
- return loadClassExplicitly(name, resolve);
- }
-
- if (isIgfsHadoop(name)) { // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop.
- Boolean hasDeps = cache.get(name);
-
- if (hasDeps == null) {
- hasDeps = hasExternalDependencies(name, new HashSet<String>());
-
- cache.put(name, hasDeps);
- }
-
- if (hasDeps)
- return loadClassExplicitly(name, resolve);
- }
-
- return super.loadClass(name, resolve);
- }
- catch (NoClassDefFoundError | ClassNotFoundException e) {
- throw new ClassNotFoundException("Failed to load class: " + name, e);
- }
- }
-
- /**
- * @param name Name.
- * @param replace Replacement.
- * @return Class.
- */
- private Class<?> loadFromBytes(final String name, final String replace) {
- synchronized (getClassLoadingLock(name)) {
- // First, check if the class has already been loaded
- Class c = findLoadedClass(name);
-
- if (c != null)
- return c;
-
- byte[] bytes = bytesCache.get(name);
-
- if (bytes == null) {
- InputStream in = loadClassBytes(getParent(), replace);
-
- ClassReader rdr;
-
- try {
- rdr = new ClassReader(in);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- ClassWriter w = new ClassWriter(Opcodes.ASM4);
-
- rdr.accept(new RemappingClassAdapter(w, new Remapper() {
- /** */
- String replaceType = replace.replace('.', '/');
-
- /** */
- String nameType = name.replace('.', '/');
-
- @Override public String map(String type) {
- if (type.equals(replaceType))
- return nameType;
-
- return type;
- }
- }), ClassReader.EXPAND_FRAMES);
-
- bytes = w.toByteArray();
-
- bytesCache.put(name, bytes);
- }
-
- return defineClass(name, bytes, 0, bytes.length);
- }
- }
-
- /**
- * @param name Class name.
- * @param resolve Resolve class.
- * @return Class.
- * @throws ClassNotFoundException If failed.
- */
- private Class<?> loadClassExplicitly(String name, boolean resolve) throws ClassNotFoundException {
- synchronized (getClassLoadingLock(name)) {
- // First, check if the class has already been loaded
- Class c = findLoadedClass(name);
-
- if (c == null) {
- long t1 = System.nanoTime();
-
- c = findClass(name);
-
- // this is the defining class loader; record the stats
- sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1);
- sun.misc.PerfCounter.getFindClasses().increment();
- }
-
- if (resolve)
- resolveClass(c);
-
- return c;
- }
- }
-
- /**
- * @param ldr Loader.
- * @param clsName Class.
- * @return Input stream.
- */
- @Nullable private InputStream loadClassBytes(ClassLoader ldr, String clsName) {
- return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class");
- }
-
- /**
- * @param clsName Class name.
- * @return {@code true} If the class has external dependencies.
- */
- boolean hasExternalDependencies(final String clsName, final Set<String> visited) {
- if (isHadoop(clsName)) // Hadoop must not be in classpath but Idea sucks, so filtering explicitly as external.
- return true;
-
- // Try to get from parent to check if the type accessible.
- InputStream in = loadClassBytes(getParent(), clsName);
-
- if (in == null) // The class is external itself, it must be loaded from this class loader.
- return true;
-
- if (!isIgfsHadoop(clsName)) // Other classes should not have external dependencies.
- return false;
-
- final ClassReader rdr;
-
- try {
- rdr = new ClassReader(in);
- }
- catch (IOException e) {
- throw new RuntimeException("Failed to read class: " + clsName, e);
- }
-
- visited.add(clsName);
-
- final AtomicBoolean hasDeps = new AtomicBoolean();
-
- rdr.accept(new ClassVisitor(Opcodes.ASM4) {
- AnnotationVisitor av = new AnnotationVisitor(Opcodes.ASM4) {
- // TODO
- };
-
- FieldVisitor fv = new FieldVisitor(Opcodes.ASM4) {
- @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) {
- onType(desc);
-
- return av;
- }
- };
-
- MethodVisitor mv = new MethodVisitor(Opcodes.ASM4) {
- @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) {
- onType(desc);
-
- return av;
- }
-
- @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) {
- onType(desc);
-
- return av;
- }
-
- @Override public AnnotationVisitor visitAnnotationDefault() {
- return av;
- }
-
- @Override public void visitFieldInsn(int i, String owner, String name, String desc) {
- onType(owner);
- onType(desc);
- }
-
- @Override public void visitFrame(int i, int i2, Object[] locTypes, int i3, Object[] stackTypes) {
- for (Object o : locTypes) {
- if (o instanceof String)
- onType((String)o);
- }
-
- for (Object o : stackTypes) {
- if (o instanceof String)
- onType((String)o);
- }
- }
-
- @Override public void visitLocalVariable(String name, String desc, String signature, Label lb,
- Label lb2, int i) {
- onType(desc);
- }
-
- @Override public void visitMethodInsn(int i, String owner, String name, String desc) {
- onType(owner);
- }
-
- @Override public void visitMultiANewArrayInsn(String desc, int dim) {
- onType(desc);
- }
-
- @Override public void visitTryCatchBlock(Label lb, Label lb2, Label lb3, String e) {
- onType(e);
- }
- };
-
- void onClass(String depCls) {
- assert validateClassName(depCls) : depCls;
-
- if (depCls.startsWith("java.")) // Filter out platform classes.
- return;
-
- if (visited.contains(depCls))
- return;
-
- Boolean res = cache.get(depCls);
-
- if (res == Boolean.TRUE || (res == null && hasExternalDependencies(depCls, visited)))
- hasDeps.set(true);
- }
-
- void onType(String type) {
- if (type == null)
- return;
-
- int off = 0;
-
- while (type.charAt(off) == '[')
- off++; // Handle arrays.
-
- if (off != 0)
- type = type.substring(off);
-
- if (type.length() == 1)
- return; // Get rid of primitives.
-
- if (type.charAt(type.length() - 1) == ';') {
- assert type.charAt(0) == 'L' : type;
-
- type = type.substring(1, type.length() - 1);
- }
-
- type = type.replace('/', '.');
-
- onClass(type);
- }
-
- @Override public void visit(int i, int i2, String name, String signature, String superName,
- String[] ifaces) {
- onType(superName);
-
- if (ifaces != null) {
- for (String iface : ifaces)
- onType(iface);
- }
- }
-
- @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
- onType(desc);
-
- return av;
- }
-
- @Override public void visitInnerClass(String name, String outerName, String innerName, int i) {
- onType(name);
- }
-
- @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) {
- onType(desc);
-
- return fv;
- }
-
- @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature,
- String[] exceptions) {
- if (exceptions != null) {
- for (String e : exceptions)
- onType(e);
- }
-
- return mv;
- }
- }, 0);
-
- if (hasDeps.get()) // We already know that we have dependencies, no need to check parent.
- return true;
-
- // Here we are known to not have any dependencies but possibly we have a parent which have them.
- int idx = clsName.lastIndexOf('$');
-
- if (idx == -1) // No parent class.
- return false;
-
- String parentCls = clsName.substring(0, idx);
-
- if (visited.contains(parentCls))
- return false;
-
- Boolean res = cache.get(parentCls);
-
- if (res == null)
- res = hasExternalDependencies(parentCls, visited);
-
- return res;
- }
-
- /**
- * @param name Class name.
- * @return {@code true} If this is a valid class name.
- */
- private static boolean validateClassName(String name) {
- int len = name.length();
-
- if (len <= 1)
- return false;
-
- if (!Character.isJavaIdentifierStart(name.charAt(0)))
- return false;
-
- boolean hasDot = false;
-
- for (int i = 1; i < len; i++) {
- char c = name.charAt(i);
-
- if (c == '.')
- hasDot = true;
- else if (!Character.isJavaIdentifierPart(c))
- return false;
- }
-
- return hasDot;
- }
-
- /**
- * @param name Variable name.
- * @param dflt Default.
- * @return Value.
- */
- private static String getEnv(String name, String dflt) {
- String res = System.getProperty(name);
-
- if (F.isEmpty(res))
- res = System.getenv(name);
-
- return F.isEmpty(res) ? dflt : res;
- }
-
- /**
- * @param res Result.
- * @param dir Directory.
- * @param startsWith Starts with prefix.
- * @throws MalformedURLException If failed.
- */
- private static void addUrls(Collection<URL> res, File dir, final String startsWith) throws Exception {
- File[] files = dir.listFiles(new FilenameFilter() {
- @Override public boolean accept(File dir, String name) {
- return startsWith == null || name.startsWith(startsWith);
- }
- });
-
- if (files == null)
- throw new IOException("Path is not a directory: " + dir);
-
- for (File file : files)
- res.add(file.toURI().toURL());
- }
-
- /**
- * @param urls URLs.
- * @return URLs.
- */
- private static URL[] addHadoopUrls(URL[] urls) {
- Collection<URL> hadoopJars;
-
- try {
- hadoopJars = hadoopUrls();
- }
- catch (IgniteCheckedException e) {
- throw new RuntimeException(e);
- }
-
- ArrayList<URL> list = new ArrayList<>(hadoopJars.size() + appJars.size() + (urls == null ? 0 : urls.length));
-
- list.addAll(appJars);
- list.addAll(hadoopJars);
-
- if (!F.isEmpty(urls))
- list.addAll(F.asList(urls));
-
- return list.toArray(new URL[list.size()]);
- }
-
- /**
- * @return HADOOP_HOME Variable.
- */
- @Nullable public static String hadoopHome() {
- return getEnv("HADOOP_PREFIX", getEnv("HADOOP_HOME", null));
- }
-
- /**
- * @return Collection of jar URLs.
- * @throws IgniteCheckedException If failed.
- */
- public static Collection<URL> hadoopUrls() throws IgniteCheckedException {
- Collection<URL> hadoopUrls = hadoopJars;
-
- if (hadoopUrls != null)
- return hadoopUrls;
-
- synchronized (GridHadoopClassLoader.class) {
- hadoopUrls = hadoopJars;
-
- if (hadoopUrls != null)
- return hadoopUrls;
-
- hadoopUrls = new ArrayList<>();
-
- String hadoopPrefix = hadoopHome();
-
- if (F.isEmpty(hadoopPrefix))
- throw new IgniteCheckedException("Failed resolve Hadoop installation location. Either HADOOP_PREFIX or " +
- "HADOOP_HOME environment variables must be set.");
-
- String commonHome = getEnv("HADOOP_COMMON_HOME", hadoopPrefix + "/share/hadoop/common");
- String hdfsHome = getEnv("HADOOP_HDFS_HOME", hadoopPrefix + "/share/hadoop/hdfs");
- String mapredHome = getEnv("HADOOP_MAPRED_HOME", hadoopPrefix + "/share/hadoop/mapreduce");
-
- try {
- addUrls(hadoopUrls, new File(commonHome + "/lib"), null);
- addUrls(hadoopUrls, new File(hdfsHome + "/lib"), null);
- addUrls(hadoopUrls, new File(mapredHome + "/lib"), null);
-
- addUrls(hadoopUrls, new File(hdfsHome), "hadoop-hdfs-");
-
- addUrls(hadoopUrls, new File(commonHome), "hadoop-common-");
- addUrls(hadoopUrls, new File(commonHome), "hadoop-auth-");
- addUrls(hadoopUrls, new File(commonHome + "/lib"), "hadoop-auth-");
-
- addUrls(hadoopUrls, new File(mapredHome), "hadoop-mapreduce-client-common");
- addUrls(hadoopUrls, new File(mapredHome), "hadoop-mapreduce-client-core");
- }
- catch (Exception e) {
- throw new IgniteCheckedException(e);
- }
-
- hadoopJars = hadoopUrls;
-
- return hadoopUrls;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java
deleted file mode 100644
index 337bfe9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-
-/**
- * Abstract class for all hadoop components.
- */
-public abstract class GridHadoopComponent {
- /** Hadoop context. */
- protected GridHadoopContext ctx;
-
- /** Logger. */
- protected IgniteLogger log;
-
- /**
- * @param ctx Hadoop context.
- */
- public void start(GridHadoopContext ctx) throws IgniteCheckedException {
- this.ctx = ctx;
-
- log = ctx.kernalContext().log(getClass());
- }
-
- /**
- * Stops manager.
- */
- public void stop(boolean cancel) {
- // No-op.
- }
-
- /**
- * Callback invoked when all grid components are started.
- */
- public void onKernalStart() throws IgniteCheckedException {
- // No-op.
- }
-
- /**
- * Callback invoked before all grid components are stopped.
- */
- public void onKernalStop(boolean cancel) {
- // No-op.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java
deleted file mode 100644
index 3160e3d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
-import org.apache.ignite.internal.processors.hadoop.shuffle.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-
-/**
- * Hadoop accelerator context.
- */
-public class GridHadoopContext {
- /** Kernal context. */
- private GridKernalContext ctx;
-
- /** Hadoop configuration. */
- private GridHadoopConfiguration cfg;
-
- /** Job tracker. */
- private GridHadoopJobTracker jobTracker;
-
- /** External task executor. */
- private GridHadoopTaskExecutorAdapter taskExecutor;
-
- /** */
- private GridHadoopShuffle shuffle;
-
- /** Managers list. */
- private List<GridHadoopComponent> components = new ArrayList<>();
-
- /**
- * @param ctx Kernal context.
- */
- public GridHadoopContext(
- GridKernalContext ctx,
- GridHadoopConfiguration cfg,
- GridHadoopJobTracker jobTracker,
- GridHadoopTaskExecutorAdapter taskExecutor,
- GridHadoopShuffle shuffle
- ) {
- this.ctx = ctx;
- this.cfg = cfg;
-
- this.jobTracker = add(jobTracker);
- this.taskExecutor = add(taskExecutor);
- this.shuffle = add(shuffle);
- }
-
- /**
- * Gets list of managers.
- *
- * @return List of managers.
- */
- public List<GridHadoopComponent> components() {
- return components;
- }
-
- /**
- * Gets kernal context.
- *
- * @return Grid kernal context instance.
- */
- public GridKernalContext kernalContext() {
- return ctx;
- }
-
- /**
- * Gets Hadoop configuration.
- *
- * @return Hadoop configuration.
- */
- public GridHadoopConfiguration configuration() {
- return cfg;
- }
-
- /**
- * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}.
- *
- * @return Local node ID.
- */
- public UUID localNodeId() {
- return ctx.localNodeId();
- }
-
- /**
- * Gets local node order.
- *
- * @return Local node order.
- */
- public long localNodeOrder() {
- assert ctx.discovery() != null;
-
- return ctx.discovery().localNode().order();
- }
-
- /**
- * @return Hadoop-enabled nodes.
- */
- public Collection<ClusterNode> nodes() {
- return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersion());
- }
-
- /**
- * @return {@code True} if
- */
- public boolean jobUpdateLeader() {
- long minOrder = Long.MAX_VALUE;
- ClusterNode minOrderNode = null;
-
- for (ClusterNode node : nodes()) {
- if (node.order() < minOrder) {
- minOrder = node.order();
- minOrderNode = node;
- }
- }
-
- assert minOrderNode != null;
-
- return localNodeId().equals(minOrderNode.id());
- }
-
- /**
- * @param meta Job metadata.
- * @return {@code true} If local node is participating in job execution.
- */
- public boolean isParticipating(GridHadoopJobMetadata meta) {
- UUID locNodeId = localNodeId();
-
- if (locNodeId.equals(meta.submitNodeId()))
- return true;
-
- GridHadoopMapReducePlan plan = meta.mapReducePlan();
-
- return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader();
- }
-
- /**
- * @return Jon tracker instance.
- */
- public GridHadoopJobTracker jobTracker() {
- return jobTracker;
- }
-
- /**
- * @return Task executor.
- */
- public GridHadoopTaskExecutorAdapter taskExecutor() {
- return taskExecutor;
- }
-
- /**
- * @return Shuffle.
- */
- public GridHadoopShuffle shuffle() {
- return shuffle;
- }
-
- /**
- * @return Map-reduce planner.
- */
- public GridHadoopMapReducePlanner planner() {
- return cfg.getMapReducePlanner();
- }
-
- /**
- * Adds component.
- *
- * @param c Component to add.
- * @return Added manager.
- */
- private <C extends GridHadoopComponent> C add(C c) {
- components.add(c);
-
- return c;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java
deleted file mode 100644
index 555c573..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.lang.reflect.*;
-import java.util.*;
-
-/**
- * Hadoop job info based on default Hadoop configuration.
- */
-public class GridHadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable {
- /** */
- private static final long serialVersionUID = 5489900236464999951L;
-
- /** {@code true} If job has combiner. */
- private boolean hasCombiner;
-
- /** Number of reducers configured for job. */
- private int numReduces;
-
- /** Configuration. */
- private Map<String,String> props = new HashMap<>();
-
- /** Job name. */
- private String jobName;
-
- /** User name. */
- private String user;
-
- /** */
- private static volatile Class<?> jobCls;
-
- /**
- * Default constructor required by {@link Externalizable}.
- */
- public GridHadoopDefaultJobInfo() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param jobName Job name.
- * @param user User name.
- * @param hasCombiner {@code true} If job has combiner.
- * @param numReduces Number of reducers configured for job.
- * @param props All other properties of the job.
- */
- public GridHadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces,
- Map<String, String> props) {
- this.jobName = jobName;
- this.user = user;
- this.hasCombiner = hasCombiner;
- this.numReduces = numReduces;
- this.props = props;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public String property(String name) {
- return props.get(name);
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException {
- try {
- Class<?> jobCls0 = jobCls;
-
- if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes.
- synchronized (GridHadoopDefaultJobInfo.class) {
- if ((jobCls0 = jobCls) == null) {
- GridHadoopClassLoader ldr = new GridHadoopClassLoader(null);
-
- jobCls = jobCls0 = ldr.loadClass(GridHadoopV2Job.class.getName());
- }
- }
- }
-
- Constructor<?> constructor = jobCls0.getConstructor(GridHadoopJobId.class, GridHadoopDefaultJobInfo.class,
- IgniteLogger.class);
-
- return (GridHadoopJob)constructor.newInstance(jobId, this, log);
- }
- // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call.
- catch (Throwable t) {
- throw new IgniteCheckedException(t);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasCombiner() {
- return hasCombiner;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasReducer() {
- return reducers() > 0;
- }
-
- /** {@inheritDoc} */
- @Override public int reducers() {
- return numReduces;
- }
-
- /** {@inheritDoc} */
- @Override public String jobName() {
- return jobName;
- }
-
- /** {@inheritDoc} */
- @Override public String user() {
- return user;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, jobName);
- U.writeString(out, user);
-
- out.writeBoolean(hasCombiner);
- out.writeInt(numReduces);
-
- U.writeStringMap(out, props);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobName = U.readString(in);
- user = U.readString(in);
-
- hasCombiner = in.readBoolean();
- numReduces = in.readInt();
-
- props = U.readStringMap(in);
- }
-
- /**
- * @return Properties of the job.
- */
- public Map<String, String> properties() {
- return props;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java
deleted file mode 100644
index 55e3690..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.*;
-import org.jetbrains.annotations.*;
-
-/**
- * Hadoop facade implementation.
- */
-public class GridHadoopImpl implements GridHadoop {
- /** Hadoop processor. */
- private final IgniteHadoopProcessor proc;
-
- /** Busy lock. */
- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
- /**
- * Constructor.
- *
- * @param proc Hadoop processor.
- */
- GridHadoopImpl(IgniteHadoopProcessor proc) {
- this.proc = proc;
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopConfiguration configuration() {
- return proc.config();
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobId nextJobId() {
- if (busyLock.enterBusy()) {
- try {
- return proc.nextJobId();
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get next job ID (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
- if (busyLock.enterBusy()) {
- try {
- return proc.submit(jobId, jobInfo);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to submit job (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.status(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get job status (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.counters(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get job counters (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.finishFuture(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get job finish future (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.kill(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to kill job (grid is stopping).");
- }
-}