You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 11:09:30 UTC
[35/63] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented
new class loading architecture for embedded execution mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java
new file mode 100644
index 0000000..1fda1c3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathSummary;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.IgniteState.STARTED;
+import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.LOCALHOST;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.parameter;
+
+/**
+ * Wrapper for IGFS server.
+ */
+public class HadoopIgfsWrapper implements HadoopIgfs {
+ /** Delegate. */
+ private final AtomicReference<Delegate> delegateRef = new AtomicReference<>();
+
+ /** Authority. */
+ private final String authority;
+
+ /** Connection string. */
+ private final HadoopIgfsEndpoint endpoint;
+
+ /** Log directory. */
+ private final String logDir;
+
+ /** Configuration. */
+ private final Configuration conf;
+
+ /** Logger. */
+ private final Log log;
+
+ /** The user name this wrapper works on behalf of. */
+ private final String userName;
+
+ /**
+ * Constructor.
+ *
+ * @param authority Authority (connection string).
+ * @param logDir Log directory for server.
+ * @param conf Configuration.
+ * @param log Current logger.
+ */
+ public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user)
+ throws IOException {
+ try {
+ this.authority = authority;
+ this.endpoint = new HadoopIgfsEndpoint(authority);
+ this.logDir = logDir;
+ this.conf = conf;
+ this.log = log;
+ this.userName = user;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to parse endpoint: " + authority, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() {
+ @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop,
+ IgfsHandshakeResponse hndResp) {
+ return hndResp;
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close(boolean force) {
+ Delegate delegate = delegateRef.get();
+
+ if (delegate != null && delegateRef.compareAndSet(delegate, null))
+ delegate.close(force);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsFile info(final IgfsPath path) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+ @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+ throws IgniteCheckedException, IOException {
+ return hadoop.info(path);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+ @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+ throws IgniteCheckedException, IOException {
+ return hadoop.update(path, props);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime)
+ throws IOException {
+ return withReconnectHandling(new FileSystemClosure<Boolean>() {
+ @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+ throws IgniteCheckedException, IOException {
+ return hadoop.setTimes(path, accessTime, modificationTime);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<Boolean>() {
+ @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+ throws IgniteCheckedException, IOException {
+ return hadoop.rename(src, dest);
+ }
+ }, src);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<Boolean>() {
+ @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+ throws IgniteCheckedException, IOException {
+ return hadoop.delete(path, recursive);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start,
+ final long len) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() {
+ @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop,
+ IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+ return hadoop.affinity(path, start, len);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() {
+ @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+ throws IgniteCheckedException, IOException {
+ return hadoop.contentSummary(path);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<Boolean>() {
+ @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+ throws IgniteCheckedException, IOException {
+ return hadoop.mkdirs(path, props);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() {
+ @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop,
+ IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+ return hadoop.listFiles(path);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() {
+ @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop,
+ IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+ return hadoop.listPaths(path);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsStatus fsStatus() throws IOException {
+ return withReconnectHandling(new FileSystemClosure<IgfsStatus>() {
+ @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+ throws IgniteCheckedException, IOException {
+ return hadoop.fsStatus();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+ IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+ return hadoop.open(path);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
+ throws IOException {
+ return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+ IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+ return hadoop.open(path, seqReadsBeforePrefetch);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite,
+ final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props)
+ throws IOException {
+ return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+ IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+ return hadoop.create(path, overwrite, colocate, replication, blockSize, props);
+ }
+ }, path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
+ @Nullable final Map<String, String> props) throws IOException {
+ return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+ IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+ return hadoop.append(path, create, props);
+ }
+ }, path);
+ }
+
+ /**
+ * Execute closure which is not path-specific.
+ *
+ * @param clo Closure.
+ * @return Result.
+ * @throws IOException If failed.
+ */
+ private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException {
+ return withReconnectHandling(clo, null);
+ }
+
+ /**
+ * Execute closure.
+ *
+ * @param clo Closure.
+ * @param path Path for exceptions.
+ * @return Result.
+ * @throws IOException If failed.
+ */
+ private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path)
+ throws IOException {
+ Exception err = null;
+
+ for (int i = 0; i < 2; i++) {
+ Delegate curDelegate = null;
+
+ boolean close = false;
+ boolean force = false;
+
+ try {
+ curDelegate = delegate();
+
+ assert curDelegate != null;
+
+ close = curDelegate.doomed;
+
+ return clo.apply(curDelegate.hadoop, curDelegate.hndResp);
+ }
+ catch (HadoopIgfsCommunicationException e) {
+ if (curDelegate != null && !curDelegate.doomed) {
+ // Try getting rid fo faulty delegate ASAP.
+ delegateRef.compareAndSet(curDelegate, null);
+
+ close = true;
+ force = true;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to send message to a server: " + e);
+
+ err = e;
+ }
+ catch (IgniteCheckedException e) {
+ throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null);
+ }
+ finally {
+ if (close) {
+ assert curDelegate != null;
+
+ curDelegate.close(force);
+ }
+ }
+ }
+
+ List<Throwable> list = X.getThrowableList(err);
+
+ Throwable cause = list.get(list.size() - 1);
+
+ throw new IOException("Failed to communicate with IGFS: "
+ + (cause.getMessage() == null ? cause.toString() : cause.getMessage()), err);
+ }
+
+ /**
+ * Get delegate creating it if needed.
+ *
+ * @return Delegate.
+ */
+ private Delegate delegate() throws HadoopIgfsCommunicationException {
+ // These fields will contain possible exceptions from shmem and TCP endpoints.
+ Exception errShmem = null;
+ Exception errTcp = null;
+
+ // 1. If delegate is set, return it immediately.
+ Delegate curDelegate = delegateRef.get();
+
+ if (curDelegate != null)
+ return curDelegate;
+
+ // 2. Guess that we are in the same VM.
+ boolean skipInProc = parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false);
+
+ if (!skipInProc) {
+ IgfsEx igfs = getIgfsEx(endpoint.grid(), endpoint.igfs());
+
+ if (igfs != null) {
+ HadoopIgfsEx hadoop = null;
+
+ try {
+ hadoop = new HadoopIgfsInProc(igfs, log, userName);
+
+ curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (e instanceof HadoopIgfsCommunicationException)
+ if (hadoop != null)
+ hadoop.close(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e);
+ }
+ }
+ }
+
+ // 3. Try connecting using shmem.
+ boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false);
+
+ if (curDelegate == null && !skipLocShmem && !U.isWindows()) {
+ HadoopIgfsEx hadoop = null;
+
+ try {
+ hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName);
+
+ curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (e instanceof HadoopIgfsCommunicationException)
+ hadoop.close(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e);
+
+ errShmem = e;
+ }
+ }
+
+ // 4. Try local TCP connection.
+ boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
+
+ if (curDelegate == null && !skipLocTcp) {
+ HadoopIgfsEx hadoop = null;
+
+ try {
+ hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
+ log, userName);
+
+ curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (e instanceof HadoopIgfsCommunicationException)
+ hadoop.close(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
+ ", port=" + endpoint.port() + ']', e);
+
+ errTcp = e;
+ }
+ }
+
+ // 5. Try remote TCP connection.
+ if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) {
+ HadoopIgfsEx hadoop = null;
+
+ try {
+ hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(),
+ log, userName);
+
+ curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (e instanceof HadoopIgfsCommunicationException)
+ hadoop.close(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
+ ", port=" + endpoint.port() + ']', e);
+
+ errTcp = e;
+ }
+ }
+
+ if (curDelegate != null) {
+ if (!delegateRef.compareAndSet(null, curDelegate))
+ curDelegate.doomed = true;
+
+ return curDelegate;
+ }
+ else {
+ SB errMsg = new SB("Failed to connect to IGFS [endpoint=igfs://" + authority + ", attempts=[");
+
+ if (errShmem != null)
+ errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], ");
+
+ errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] ");
+
+ errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " +
+ "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint).");
+
+ throw new HadoopIgfsCommunicationException(errMsg.toString());
+ }
+ }
+
+ /**
+ * File system operation closure.
+ */
+ private static interface FileSystemClosure<T> {
+ /**
+ * Call closure body.
+ *
+ * @param hadoop RPC handler.
+ * @param hndResp Handshake response.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ * @throws IOException If failed.
+ */
+ public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException;
+ }
+
+ /**
+ * Delegate.
+ */
+ private static class Delegate {
+ /** RPC handler. */
+ private final HadoopIgfsEx hadoop;
+
+ /** Handshake request. */
+ private final IgfsHandshakeResponse hndResp;
+
+ /** Close guard. */
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ /** Whether this delegate must be closed at the end of the next invocation. */
+ private boolean doomed;
+
+ /**
+ * Constructor.
+ *
+ * @param hadoop Hadoop.
+ * @param hndResp Handshake response.
+ */
+ private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) {
+ this.hadoop = hadoop;
+ this.hndResp = hndResp;
+ }
+
+ /**
+ * Close underlying RPC handler.
+ *
+ * @param force Force flag.
+ */
+ private void close(boolean force) {
+ if (closeGuard.compareAndSet(false, true))
+ hadoop.close(force);
+ }
+ }
+
+ /**
+ * Helper method to find Igfs of the given name in the given Ignite instance.
+ *
+ * @param gridName The name of the grid to check.
+ * @param igfsName The name of Igfs.
+ * @return The file system instance, or null if not found.
+ */
+ private static IgfsEx getIgfsEx(@Nullable String gridName, @Nullable String igfsName) {
+ if (Ignition.state(gridName) == STARTED) {
+ try {
+ for (IgniteFileSystem fs : Ignition.ignite(gridName).fileSystems()) {
+ if (F.eq(fs.name(), igfsName))
+ return (IgfsEx)fs;
+ }
+ }
+ catch (IgniteIllegalStateException ignore) {
+ // May happen if the grid state has changed:
+ }
+ }
+
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
new file mode 100644
index 0000000..be2aa09
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.proto;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientException;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopMapReduceCounters;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolJobCountersTask;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolJobStatusTask;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolKillJobTask;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolNextTaskIdTask;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolSubmitJobTask;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolTaskArguments;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.io.IOException;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Hadoop client protocol.
+ */
+public class HadoopClientProtocol implements ClientProtocol {
+ /** Protocol version. */
+ private static final long PROTO_VER = 1L;
+
+ /** Default Ignite system directory. */
+ private static final String SYS_DIR = ".ignite/system";
+
+ /** Configuration. */
+ private final Configuration conf;
+
+ /** Ignite client. */
+ private volatile GridClient cli;
+
+ /** Last received version. */
+ private long lastVer = -1;
+
+ /** Last received status. */
+ private HadoopJobStatus lastStatus;
+
+ /**
+ * Constructor.
+ *
+ * @param conf Configuration.
+ * @param cli Ignite client.
+ */
+ public HadoopClientProtocol(Configuration conf, GridClient cli) {
+ assert cli != null;
+
+ this.conf = conf;
+ this.cli = cli;
+ }
+
+ /** {@inheritDoc} */
+ @Override public JobID getNewJobID() throws IOException, InterruptedException {
+ try {
+ conf.setLong(HadoopCommonUtils.REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
+
+ HadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null);
+
+ conf.setLong(HadoopCommonUtils.RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
+
+ return new JobID(jobID.globalId().toString(), jobID.localId());
+ }
+ catch (GridClientException e) {
+ throw new IOException("Failed to get new job ID.", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException,
+ InterruptedException {
+ try {
+ conf.setLong(HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
+
+ HadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(),
+ new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf)));
+
+ if (status == null)
+ throw new IOException("Failed to submit job (null status obtained): " + jobId);
+
+ return processStatus(status);
+ }
+ catch (GridClientException | IgniteCheckedException e) {
+ throw new IOException("Failed to submit job.", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
+ return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException {
+ return Cluster.JobTrackerStatus.RUNNING;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AccessControlList getQueueAdmins(String queueName) throws IOException {
+ return new AccessControlList("*");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void killJob(JobID jobId) throws IOException, InterruptedException {
+ try {
+ cli.compute().execute(HadoopProtocolKillJobTask.class.getName(),
+ new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+ }
+ catch (GridClientException e) {
+ throw new IOException("Failed to kill job: " + jobId, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException,
+ InterruptedException {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException {
+ try {
+ Long delay = conf.getLong(HadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1);
+
+ HadoopProtocolTaskArguments args = delay >= 0 ?
+ new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) :
+ new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId());
+
+ HadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args);
+
+ if (status == null)
+ throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
+
+ return processStatus(status);
+ }
+ catch (GridClientException e) {
+ throw new IOException("Failed to get job status: " + jobId, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException {
+ try {
+ final HadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(),
+ new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+
+ if (counters == null)
+ throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
+
+ return new HadoopMapReduceCounters(counters);
+ }
+ catch (GridClientException e) {
+ throw new IOException("Failed to get job counters: " + jobId, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException {
+ return new TaskReport[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getFilesystemName() throws IOException, InterruptedException {
+ return FileSystem.get(conf).getUri().toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ return new JobStatus[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents)
+ throws IOException, InterruptedException {
+ return new TaskCompletionEvent[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException {
+ return new String[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException {
+ return new TaskTrackerInfo[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException {
+ return new TaskTrackerInfo[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSystemDir() throws IOException, InterruptedException {
+ Path sysDir = new Path(SYS_DIR);
+
+ return sysDir.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getStagingAreaDir() throws IOException, InterruptedException {
+ String usr = UserGroupInformation.getCurrentUser().getShortUserName();
+
+ return HadoopUtils.stagingAreaDir(conf, usr).toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getJobHistoryDir() throws IOException, InterruptedException {
+ return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ return new QueueInfo[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException {
+ return new QueueAclsInfo[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ return new QueueInfo[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException {
+ return new QueueInfo[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException,
+ InterruptedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
+ InterruptedException {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
+ InterruptedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException,
+ InterruptedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return PROTO_VER;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
+ throws IOException {
+ return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash);
+ }
+
+ /**
+ * Process received status update.
+ *
+ * @param status Ignite status.
+ * @return Hadoop status.
+ */
+ private JobStatus processStatus(HadoopJobStatus status) {
+ // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because
+ // IgniteHadoopClientProtocolProvider creates new instance of this class for every new job and Job class
+ // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will
+ // change in future and either protocol will serve statuses for several jobs or status update will not be
+ // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap).
+ // (vozerov)
+ if (lastVer < status.version()) {
+ lastVer = status.version();
+
+ lastStatus = status;
+ }
+ else
+ assert lastStatus != null;
+
+ return HadoopUtils.status(lastStatus, conf);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java
new file mode 100644
index 0000000..ddf6c29
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v1;
+
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext;
+
+import java.io.IOException;
+
+/**
+ * Hadoop cleanup task implementation for v1 API.
+ */
+public class HadoopV1CleanupTask extends HadoopV1Task {
+ /** Abort flag. */
+ private final boolean abort;
+
+ /**
+ * @param taskInfo Task info.
+ * @param abort Abort flag.
+ */
+ public HadoopV1CleanupTask(HadoopTaskInfo taskInfo, boolean abort) {
+ super(taskInfo);
+
+ this.abort = abort;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ JobContext jobCtx = ctx.jobContext();
+
+ try {
+ OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter();
+
+ if (abort)
+ committer.abortJob(jobCtx, JobStatus.State.FAILED);
+ else
+ committer.commitJob(jobCtx);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java
new file mode 100644
index 0000000..d91730f
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v1;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Counter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
+
+/**
+ * Hadoop counter implementation for v1 API.
+ */
+public class HadoopV1Counter extends Counters.Counter {
+ /** Delegate. */
+ private final HadoopLongCounter cntr;
+
+ /**
+ * Creates new instance.
+ *
+ * @param cntr Delegate counter.
+ */
+ public HadoopV1Counter(HadoopLongCounter cntr) {
+ this.cntr = cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDisplayName(String displayName) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return cntr.name();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDisplayName() {
+ return getName();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getValue() {
+ return cntr.value();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setValue(long val) {
+ cntr.value(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void increment(long incr) {
+ cntr.increment(incr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public String makeEscapedCompactString() {
+ return toEscapedCompactString(new HadoopV2Counter(cntr));
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public boolean contentEquals(Counters.Counter cntr) {
+ return getUnderlyingCounter().equals(cntr.getUnderlyingCounter());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getCounter() {
+ return cntr.value();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter getUnderlyingCounter() {
+ return this;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
new file mode 100644
index 0000000..65ff280
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v1;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext;
+
+/**
+ * Hadoop map task implementation for v1 API.
+ */
+public class HadoopV1MapTask extends HadoopV1Task {
+ /** */
+ private static final String[] EMPTY_HOSTS = new String[0];
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo
+ */
+ public HadoopV1MapTask(HadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ HadoopJob job = taskCtx.job();
+
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ JobConf jobConf = ctx.jobConf();
+
+ InputFormat inFormat = jobConf.getInputFormat();
+
+ HadoopInputSplit split = info().inputSplit();
+
+ InputSplit nativeSplit;
+
+ if (split instanceof HadoopFileBlock) {
+ HadoopFileBlock block = (HadoopFileBlock)split;
+
+ nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
+ }
+ else
+ nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+
+ assert nativeSplit != null;
+
+ Reporter reporter = new HadoopV1Reporter(taskCtx);
+
+ HadoopV1OutputCollector collector = null;
+
+ try {
+ collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
+ fileName(), ctx.attemptId());
+
+ RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+
+ Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+
+ Object key = reader.createKey();
+ Object val = reader.createValue();
+
+ assert mapper != null;
+
+ try {
+ try {
+ while (reader.next(key, val)) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Map task cancelled.");
+
+ mapper.map(key, val, collector, reporter);
+ }
+ }
+ finally {
+ mapper.close();
+ }
+ }
+ finally {
+ collector.closeWriter();
+ }
+
+ collector.commit();
+ }
+ catch (Exception e) {
+ if (collector != null)
+ collector.abort();
+
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java
new file mode 100644
index 0000000..1a3c4bd
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v1;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Hadoop output collector.
+ */
+public class HadoopV1OutputCollector implements OutputCollector {
+ /** Job configuration. */
+ private final JobConf jobConf;
+
+ /** Task context. */
+ private final HadoopTaskContext taskCtx;
+
+ /** Optional direct writer. */
+ private final RecordWriter writer;
+
+ /** Task attempt. */
+ private final TaskAttemptID attempt;
+
+ /**
+ * @param jobConf Job configuration.
+ * @param taskCtx Task context.
+ * @param directWrite Direct write flag.
+ * @param fileName File name.
+ * @throws IOException In case of IO exception.
+ */
+ HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite,
+ @Nullable String fileName, TaskAttemptID attempt) throws IOException {
+ this.jobConf = jobConf;
+ this.taskCtx = taskCtx;
+ this.attempt = attempt;
+
+ if (directWrite) {
+ jobConf.set("mapreduce.task.attempt.id", attempt.toString());
+
+ OutputFormat outFormat = jobConf.getOutputFormat();
+
+ writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
+ }
+ else
+ writer = null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void collect(Object key, Object val) throws IOException {
+ if (writer != null)
+ writer.write(key, val);
+ else {
+ try {
+ taskCtx.output().write(key, val);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /**
+ * Close writer.
+ *
+ * @throws IOException In case of IO exception.
+ */
+ public void closeWriter() throws IOException {
+ if (writer != null)
+ writer.close(Reporter.NULL);
+ }
+
+ /**
+ * Setup task.
+ *
+ * @throws IOException If failed.
+ */
+ public void setup() throws IOException {
+ if (writer != null)
+ jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt));
+ }
+
+ /**
+ * Commit task.
+ *
+ * @throws IOException In failed.
+ */
+ public void commit() throws IOException {
+ if (writer != null) {
+ OutputCommitter outputCommitter = jobConf.getOutputCommitter();
+
+ TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt);
+
+ if (outputCommitter.needsTaskCommit(taskCtx))
+ outputCommitter.commitTask(taskCtx);
+ }
+ }
+
+ /**
+ * Abort task.
+ */
+ public void abort() {
+ try {
+ if (writer != null)
+ jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt));
+ }
+ catch (IOException ignore) {
+ // No-op.
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java
new file mode 100644
index 0000000..97634d9
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v1;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+
+/**
+ * Hadoop partitioner adapter for v1 API.
+ */
+public class HadoopV1Partitioner implements HadoopPartitioner {
+ /** Partitioner instance. */
+ private Partitioner<Object, Object> part;
+
+ /**
+ * @param cls Hadoop partitioner class.
+ * @param conf Job configuration.
+ */
+ public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) {
+ part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key, Object val, int parts) {
+ return part.getPartition(key, val, parts);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
new file mode 100644
index 0000000..92c024e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v1;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext;
+
+/**
+ * Hadoop reduce task implementation for v1 API.
+ */
+public class HadoopV1ReduceTask extends HadoopV1Task {
+ /** {@code True} if reduce, {@code false} if combine. */
+ private final boolean reduce;
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ * @param reduce {@code True} if reduce, {@code false} if combine.
+ */
+ public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) {
+ super(taskInfo);
+
+ this.reduce = reduce;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ HadoopJob job = taskCtx.job();
+
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ JobConf jobConf = ctx.jobConf();
+
+ HadoopTaskInput input = taskCtx.input();
+
+ HadoopV1OutputCollector collector = null;
+
+ try {
+ collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
+
+ Reducer reducer;
+ if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
+ jobConf);
+ else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
+ jobConf);
+
+ assert reducer != null;
+
+ try {
+ try {
+ while (input.next()) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Reduce task cancelled.");
+
+ reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+ }
+ }
+ finally {
+ reducer.close();
+ }
+ }
+ finally {
+ collector.closeWriter();
+ }
+
+ collector.commit();
+ }
+ catch (Exception e) {
+ if (collector != null)
+ collector.abort();
+
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java
new file mode 100644
index 0000000..f3229e2
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v1;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
+
+/**
+ * Hadoop reporter implementation for v1 API.
+ */
+public class HadoopV1Reporter implements Reporter {
+ /** Context. */
+ private final HadoopTaskContext ctx;
+
+ /**
+ * Creates new instance.
+ *
+ * @param ctx Context.
+ */
+ public HadoopV1Reporter(HadoopTaskContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setStatus(String status) {
+ // TODO
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counters.Counter getCounter(Enum<?> name) {
+ return getCounter(name.getDeclaringClass().getName(), name.name());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counters.Counter getCounter(String grp, String name) {
+ return new HadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrCounter(Enum<?> key, long amount) {
+ getCounter(key).increment(amount);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrCounter(String grp, String cntr, long amount) {
+ getCounter(grp, cntr).increment(amount);
+ }
+
+ /** {@inheritDoc} */
+ @Override public InputSplit getInputSplit() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("reporter has no input"); // TODO
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getProgress() {
+ return 0.5f; // TODO
+ }
+
+ /** {@inheritDoc} */
+ @Override public void progress() {
+ // TODO
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java
new file mode 100644
index 0000000..2fd7332
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v1;
+
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext;
+
+import java.io.IOException;
+
+/**
+ * Hadoop setup task implementation for v1 API.
+ */
+public class HadoopV1SetupTask extends HadoopV1Task {
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ */
+ public HadoopV1SetupTask(HadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ try {
+ ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf());
+
+ OutputCommitter committer = ctx.jobConf().getOutputCommitter();
+
+ if (committer != null)
+ committer.setupJob(ctx.jobContext());
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java
new file mode 100644
index 0000000..11a3598
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v1;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Hadoop API v1 splitter.
+ */
+public class HadoopV1Splitter {
+ /** */
+ private static final String[] EMPTY_HOSTS = {};
+
+ /**
+ * @param jobConf Job configuration.
+ * @return Collection of mapped splits.
+ * @throws IgniteCheckedException If mapping failed.
+ */
+ public static Collection<HadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException {
+ try {
+ InputFormat<?, ?> format = jobConf.getInputFormat();
+
+ assert format != null;
+
+ InputSplit[] splits = format.getSplits(jobConf, 0);
+
+ Collection<HadoopInputSplit> res = new ArrayList<>(splits.length);
+
+ for (int i = 0; i < splits.length; i++) {
+ InputSplit nativeSplit = splits[i];
+
+ if (nativeSplit instanceof FileSplit) {
+ FileSplit s = (FileSplit)nativeSplit;
+
+ res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
+ }
+ else
+ res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
+ }
+
+ return res;
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * @param clsName Input split class name.
+ * @param in Input stream.
+ * @param hosts Optional hosts.
+ * @return File block or {@code null} if it is not a {@link FileSplit} instance.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public static HadoopFileBlock readFileBlock(String clsName, FSDataInputStream in,
+ @Nullable String[] hosts) throws IgniteCheckedException {
+ if (!FileSplit.class.getName().equals(clsName))
+ return null;
+
+ FileSplit split = U.newInstance(FileSplit.class);
+
+ try {
+ split.readFields(in);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ if (hosts == null)
+ hosts = EMPTY_HOSTS;
+
+ return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java
new file mode 100644
index 0000000..4ed5eb3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v1;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.ignite.internal.processors.hadoop.HadoopTask;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+/**
+ * Extended Hadoop v1 task.
+ */
+public abstract class HadoopV1Task extends HadoopTask {
+ /** Indicates that this task is to be cancelled. */
+ private volatile boolean cancelled;
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ */
+ protected HadoopV1Task(HadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /**
+ * Gets file name for that task result.
+ *
+ * @return File name.
+ */
+ public String fileName() {
+ NumberFormat numFormat = NumberFormat.getInstance();
+
+ numFormat.setMinimumIntegerDigits(5);
+ numFormat.setGroupingUsed(false);
+
+ return "part-" + numFormat.format(info().taskNumber());
+ }
+
+ /**
+ *
+ * @param jobConf Job configuration.
+ * @param taskCtx Task context.
+ * @param directWrite Direct write flag.
+ * @param fileName File name.
+ * @param attempt Attempt of task.
+ * @return Collector.
+ * @throws IOException In case of IO exception.
+ */
+ protected HadoopV1OutputCollector collector(JobConf jobConf, HadoopV2TaskContext taskCtx,
+ boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException {
+ HadoopV1OutputCollector collector = new HadoopV1OutputCollector(jobConf, taskCtx, directWrite,
+ fileName, attempt) {
+ /** {@inheritDoc} */
+ @Override public void collect(Object key, Object val) throws IOException {
+ if (cancelled)
+ throw new HadoopTaskCancelledException("Task cancelled.");
+
+ super.collect(key, val);
+ }
+ };
+
+ collector.setup();
+
+ return collector;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ cancelled = true;
+ }
+
+ /** Returns true if task is cancelled. */
+ public boolean isCancelled() {
+ return cancelled;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java
new file mode 100644
index 0000000..ea7128c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * Replacement for Hadoop {@code org.apache.hadoop.util.Daemon} class.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class HadoopDaemon extends Thread {
+ /** Lock object used for synchronization. */
+ private static final Object lock = new Object();
+
+ /** Collection to hold the threads to be stopped. */
+ private static Collection<HadoopDaemon> daemons = new LinkedList<>();
+
+ {
+ setDaemon(true); // always a daemon
+ }
+
+ /** Runnable of this thread, may be this. */
+ final Runnable runnable;
+
+ /**
+ * Construct a daemon thread.
+ */
+ public HadoopDaemon() {
+ super();
+
+ runnable = this;
+
+ enqueueIfNeeded();
+ }
+
+ /**
+ * Construct a daemon thread.
+ */
+ public HadoopDaemon(Runnable runnable) {
+ super(runnable);
+
+ this.runnable = runnable;
+
+ this.setName(runnable.toString());
+
+ enqueueIfNeeded();
+ }
+
+ /**
+ * Construct a daemon thread to be part of a specified thread group.
+ */
+ public HadoopDaemon(ThreadGroup grp, Runnable runnable) {
+ super(grp, runnable);
+
+ this.runnable = runnable;
+
+ this.setName(runnable.toString());
+
+ enqueueIfNeeded();
+ }
+
+ /**
+ * Getter for the runnable. May return this.
+ *
+ * @return the runnable
+ */
+ public Runnable getRunnable() {
+ return runnable;
+ }
+
+ /**
+ * if the runnable is a Hadoop org.apache.hadoop.hdfs.PeerCache Runnable.
+ *
+ * @param r the runnable.
+ * @return true if it is.
+ */
+ private static boolean isPeerCacheRunnable(Runnable r) {
+ String name = r.getClass().getName();
+
+ return name.startsWith("org.apache.hadoop.hdfs.PeerCache");
+ }
+
+ /**
+ * Enqueue this thread if it should be stopped upon the task end.
+ */
+ private void enqueueIfNeeded() {
+ synchronized (lock) {
+ if (daemons == null)
+ throw new RuntimeException("Failed to create HadoopDaemon (its registry is already cleared): " +
+ "[classLoader=" + getClass().getClassLoader() + ']');
+
+ if (runnable.getClass().getClassLoader() == getClass().getClassLoader() && isPeerCacheRunnable(runnable))
+ daemons.add(this);
+ }
+ }
+
+ /**
+ * Stops all the registered threads.
+ */
+ public static void dequeueAndStopAll() {
+ synchronized (lock) {
+ if (daemons != null) {
+ for (HadoopDaemon daemon : daemons)
+ daemon.interrupt();
+
+ daemons = null;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java
new file mode 100644
index 0000000..e045dba
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The wrapper around external serializer.
+ */
+public class HadoopSerializationWrapper<T> implements HadoopSerialization {
+ /** External serializer - writer. */
+ private final Serializer<T> serializer;
+
+ /** External serializer - reader. */
+ private final Deserializer<T> deserializer;
+
+ /** Data output for current write operation. */
+ private OutputStream currOut;
+
+ /** Data input for current read operation. */
+ private InputStream currIn;
+
+ /** Wrapper around current output to provide OutputStream interface. */
+ private final OutputStream outStream = new OutputStream() {
+ /** {@inheritDoc} */
+ @Override public void write(int b) throws IOException {
+ currOut.write(b);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] b, int off, int len) throws IOException {
+ currOut.write(b, off, len);
+ }
+ };
+
+ /** Wrapper around current input to provide InputStream interface. */
+ private final InputStream inStream = new InputStream() {
+ /** {@inheritDoc} */
+ @Override public int read() throws IOException {
+ return currIn.read();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(byte[] b, int off, int len) throws IOException {
+ return currIn.read(b, off, len);
+ }
+ };
+
+ /**
+ * @param serialization External serializer to wrap.
+ * @param cls The class to serialize.
+ */
+ public HadoopSerializationWrapper(Serialization<T> serialization, Class<T> cls) throws IgniteCheckedException {
+ assert cls != null;
+
+ serializer = serialization.getSerializer(cls);
+ deserializer = serialization.getDeserializer(cls);
+
+ try {
+ serializer.open(outStream);
+ deserializer.open(inStream);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
+ assert out != null;
+ assert obj != null;
+
+ try {
+ currOut = (OutputStream)out;
+
+ serializer.serialize((T)obj);
+
+ currOut = null;
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
+ assert in != null;
+
+ try {
+ currIn = (InputStream)in;
+
+ T res = deserializer.deserialize((T) obj);
+
+ currIn = null;
+
+ return res;
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ try {
+ serializer.close();
+ deserializer.close();
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopShutdownHookManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopShutdownHookManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopShutdownHookManager.java
new file mode 100644
index 0000000..6d947e8
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopShutdownHookManager.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Fake manager for shutdown hooks.
+ */
+public class HadoopShutdownHookManager {
+ /** */
+ private static final HadoopShutdownHookManager MGR = new HadoopShutdownHookManager();
+
+ /**
+ * Return <code>ShutdownHookManager</code> singleton.
+ *
+ * @return <code>ShutdownHookManager</code> singleton.
+ */
+ public static HadoopShutdownHookManager get() {
+ return MGR;
+ }
+
+ /** */
+ private Set<Runnable> hooks = Collections.synchronizedSet(new HashSet<Runnable>());
+
+ /** */
+ private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
+
+ /**
+ * Singleton.
+ */
+ private HadoopShutdownHookManager() {
+ // No-op.
+ }
+
+ /**
+ * Adds a shutdownHook with a priority, the higher the priority
+ * the earlier will run. ShutdownHooks with same priority run
+ * in a non-deterministic order.
+ *
+ * @param shutdownHook shutdownHook <code>Runnable</code>
+ * @param priority priority of the shutdownHook.
+ */
+ public void addShutdownHook(Runnable shutdownHook, int priority) {
+ if (shutdownHook == null)
+ throw new IllegalArgumentException("shutdownHook cannot be NULL");
+
+ hooks.add(shutdownHook);
+ }
+
+ /**
+ * Removes a shutdownHook.
+ *
+ * @param shutdownHook shutdownHook to remove.
+ * @return TRUE if the shutdownHook was registered and removed,
+ * FALSE otherwise.
+ */
+ public boolean removeShutdownHook(Runnable shutdownHook) {
+ return hooks.remove(shutdownHook);
+ }
+
+ /**
+ * Indicates if a shutdownHook is registered or not.
+ *
+ * @param shutdownHook shutdownHook to check if registered.
+ * @return TRUE/FALSE depending if the shutdownHook is is registered.
+ */
+ public boolean hasShutdownHook(Runnable shutdownHook) {
+ return hooks.contains(shutdownHook);
+ }
+
+ /**
+ * Indicates if shutdown is in progress or not.
+ *
+ * @return TRUE if the shutdown is in progress, otherwise FALSE.
+ */
+ public boolean isShutdownInProgress() {
+ return shutdownInProgress.get();
+ }
+}
\ No newline at end of file