You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:53:38 UTC
[46/92] [abbrv] [partial] ignite git commit: Moving classes around.
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopComponent.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopComponent.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopComponent.java
new file mode 100644
index 0000000..453d23f
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopComponent.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+
+/**
+ * Abstract class for all hadoop components.
+ */
+public abstract class HadoopComponent {
+ /** Hadoop context. */
+ protected HadoopContext ctx;
+
+ /** Logger. */
+ protected IgniteLogger log;
+
+ /**
+ * @param ctx Hadoop context.
+ */
+ public void start(HadoopContext ctx) throws IgniteCheckedException {
+ this.ctx = ctx;
+
+ log = ctx.kernalContext().log(getClass());
+ }
+
+ /**
+ * Stops manager.
+ */
+ public void stop(boolean cancel) {
+ // No-op.
+ }
+
+ /**
+ * Callback invoked when all grid components are started.
+ */
+ public void onKernalStart() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /**
+ * Callback invoked before all grid components are stopped.
+ */
+ public void onKernalStop(boolean cancel) {
+ // No-op.
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopContext.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopContext.java
new file mode 100644
index 0000000..7c1bf0b
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopContext.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskExecutorAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ * Hadoop accelerator context.
+ */
+public class HadoopContext {
+ /** Kernal context. */
+ private GridKernalContext ctx;
+
+ /** Hadoop configuration. */
+ private HadoopConfiguration cfg;
+
+ /** Job tracker. */
+ private HadoopJobTracker jobTracker;
+
+ /** External task executor. */
+ private HadoopTaskExecutorAdapter taskExecutor;
+
+ /** */
+ private HadoopShuffle shuffle;
+
+ /** Managers list. */
+ private List<HadoopComponent> components = new ArrayList<>();
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public HadoopContext(
+ GridKernalContext ctx,
+ HadoopConfiguration cfg,
+ HadoopJobTracker jobTracker,
+ HadoopTaskExecutorAdapter taskExecutor,
+ HadoopShuffle shuffle
+ ) {
+ this.ctx = ctx;
+ this.cfg = cfg;
+
+ this.jobTracker = add(jobTracker);
+ this.taskExecutor = add(taskExecutor);
+ this.shuffle = add(shuffle);
+ }
+
+ /**
+ * Gets list of managers.
+ *
+ * @return List of managers.
+ */
+ public List<HadoopComponent> components() {
+ return components;
+ }
+
+ /**
+ * Gets kernal context.
+ *
+ * @return Grid kernal context instance.
+ */
+ public GridKernalContext kernalContext() {
+ return ctx;
+ }
+
+ /**
+ * Gets Hadoop configuration.
+ *
+ * @return Hadoop configuration.
+ */
+ public HadoopConfiguration configuration() {
+ return cfg;
+ }
+
+ /**
+ * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}.
+ *
+ * @return Local node ID.
+ */
+ public UUID localNodeId() {
+ return ctx.localNodeId();
+ }
+
+ /**
+ * Gets local node order.
+ *
+ * @return Local node order.
+ */
+ public long localNodeOrder() {
+ assert ctx.discovery() != null;
+
+ return ctx.discovery().localNode().order();
+ }
+
+ /**
+ * @return Hadoop-enabled nodes.
+ */
+ public Collection<ClusterNode> nodes() {
+ return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersionEx());
+ }
+
+ /**
+ * @return {@code True} if
+ */
+ public boolean jobUpdateLeader() {
+ long minOrder = Long.MAX_VALUE;
+ ClusterNode minOrderNode = null;
+
+ for (ClusterNode node : nodes()) {
+ if (node.order() < minOrder) {
+ minOrder = node.order();
+ minOrderNode = node;
+ }
+ }
+
+ assert minOrderNode != null;
+
+ return localNodeId().equals(minOrderNode.id());
+ }
+
+ /**
+ * @param meta Job metadata.
+ * @return {@code true} If local node is participating in job execution.
+ */
+ public boolean isParticipating(HadoopJobMetadata meta) {
+ UUID locNodeId = localNodeId();
+
+ if (locNodeId.equals(meta.submitNodeId()))
+ return true;
+
+ HadoopMapReducePlan plan = meta.mapReducePlan();
+
+ return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader();
+ }
+
+ /**
+ * @return Jon tracker instance.
+ */
+ public HadoopJobTracker jobTracker() {
+ return jobTracker;
+ }
+
+ /**
+ * @return Task executor.
+ */
+ public HadoopTaskExecutorAdapter taskExecutor() {
+ return taskExecutor;
+ }
+
+ /**
+ * @return Shuffle.
+ */
+ public HadoopShuffle shuffle() {
+ return shuffle;
+ }
+
+ /**
+ * @return Map-reduce planner.
+ */
+ public HadoopMapReducePlanner planner() {
+ return cfg.getMapReducePlanner();
+ }
+
+ /**
+ * Adds component.
+ *
+ * @param c Component to add.
+ * @return Added manager.
+ */
+ private <C extends HadoopComponent> C add(C c) {
+ components.add(c);
+
+ return c;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultJobInfo.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultJobInfo.java
new file mode 100644
index 0000000..2b2f3e6
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultJobInfo.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop job info based on default Hadoop configuration.
+ */
+public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
+ /** */
+ private static final long serialVersionUID = 5489900236464999951L;
+
+ /** {@code true} If job has combiner. */
+ private boolean hasCombiner;
+
+ /** Number of reducers configured for job. */
+ private int numReduces;
+
+ /** Configuration. */
+ private Map<String,String> props = new HashMap<>();
+
+ /** Job name. */
+ private String jobName;
+
+ /** User name. */
+ private String user;
+
+ /**
+ * Default constructor required by {@link Externalizable}.
+ */
+ public HadoopDefaultJobInfo() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param jobName Job name.
+ * @param user User name.
+ * @param hasCombiner {@code true} If job has combiner.
+ * @param numReduces Number of reducers configured for job.
+ * @param props All other properties of the job.
+ */
+ public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces,
+ Map<String, String> props) {
+ this.jobName = jobName;
+ this.user = user;
+ this.hasCombiner = hasCombiner;
+ this.numReduces = numReduces;
+ this.props = props;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String property(String name) {
+ return props.get(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
+ @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException {
+ assert jobCls != null;
+
+ try {
+ Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class,
+ HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class, HadoopHelper.class);
+
+ return constructor.newInstance(jobId, this, log, libNames, helper);
+ }
+ catch (Throwable t) {
+ if (t instanceof Error)
+ throw (Error)t;
+
+ throw new IgniteCheckedException(t);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasCombiner() {
+ return hasCombiner;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasReducer() {
+ return reducers() > 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int reducers() {
+ return numReduces;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String jobName() {
+ return jobName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String user() {
+ return user;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, jobName);
+ U.writeString(out, user);
+
+ out.writeBoolean(hasCombiner);
+ out.writeInt(numReduces);
+
+ U.writeStringMap(out, props);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobName = U.readString(in);
+ user = U.readString(in);
+
+ hasCombiner = in.readBoolean();
+ numReduces = in.readInt();
+
+ props = U.readStringMap(in);
+ }
+
+ /**
+ * @return Properties of the job.
+ */
+ public Map<String, String> properties() {
+ return props;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java
new file mode 100644
index 0000000..80309df
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop facade implementation.
+ */
+public class HadoopImpl implements Hadoop {
+ /** Hadoop processor. */
+ private final HadoopProcessor proc;
+
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+ /**
+ * Constructor.
+ *
+ * @param proc Hadoop processor.
+ */
+ HadoopImpl(HadoopProcessor proc) {
+ this.proc = proc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration configuration() {
+ return proc.config();
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobId nextJobId() {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.nextJobId();
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get next job ID (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.submit(jobId, jobInfo);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to submit job (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.status(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get job status (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.counters(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get job counters (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.finishFuture(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get job finish future (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.kill(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to kill job (grid is stopping).");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
new file mode 100644
index 0000000..9181623
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+
+/**
+ * Hadoop +counter group adapter.
+ */
+class HadoopMapReduceCounterGroup implements CounterGroup {
+ /** Counters. */
+ private final HadoopMapReduceCounters cntrs;
+
+ /** Group name. */
+ private final String name;
+
+ /**
+ * Creates new instance.
+ *
+ * @param cntrs Client counters instance.
+ * @param name Group name.
+ */
+ HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
+ this.cntrs = cntrs;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDisplayName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDisplayName(String displayName) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addCounter(Counter counter) {
+ addCounter(counter.getName(), counter.getDisplayName(), 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter addCounter(String name, String displayName, long value) {
+ final Counter counter = cntrs.findCounter(this.name, name);
+
+ counter.setValue(value);
+
+ return counter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName, String displayName) {
+ return cntrs.findCounter(name, counterName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName, boolean create) {
+ return cntrs.findCounter(name, counterName, create);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName) {
+ return cntrs.findCounter(name, counterName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return cntrs.groupSize(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
+ for (final Counter counter : rightGroup)
+ cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Counter> iterator() {
+ return cntrs.iterateGroup(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java
new file mode 100644
index 0000000..1fde7d9
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Counter;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ * Hadoop counters adapter.
+ */
+public class HadoopMapReduceCounters extends Counters {
+ /** */
+ private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>();
+
+ /**
+ * Creates new instance based on given counters.
+ *
+ * @param cntrs Counters to adapt.
+ */
+ public HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters cntrs) {
+ for (HadoopCounter cntr : cntrs.all())
+ if (cntr instanceof HadoopLongCounter)
+ this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
+ return addGroup(grp.getName(), grp.getDisplayName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CounterGroup addGroup(String name, String displayName) {
+ return new HadoopMapReduceCounterGroup(this, name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String grpName, String cntrName) {
+ return findCounter(grpName, cntrName, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Counter findCounter(Enum<?> key) {
+ return findCounter(key.getDeclaringClass().getName(), key.name(), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) {
+ return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name());
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Iterable<String> getGroupNames() {
+ Collection<String> res = new HashSet<>();
+
+ for (HadoopCounter counter : cntrs.values())
+ res.add(counter.group());
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<CounterGroup> iterator() {
+ final Iterator<String> iter = getGroupNames().iterator();
+
+ return new Iterator<CounterGroup>() {
+ @Override public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override public CounterGroup next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ return new HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next());
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException("not implemented");
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized CounterGroup getGroup(String grpName) {
+ return new HadoopMapReduceCounterGroup(this, grpName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int countCounters() {
+ return cntrs.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
+ for (CounterGroup group : other) {
+ for (Counter counter : group) {
+ findCounter(group.getName(), counter.getName()).increment(counter.getValue());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object genericRight) {
+ if (!(genericRight instanceof HadoopMapReduceCounters))
+ return false;
+
+ return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return cntrs.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWriteAllCounters(boolean snd) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean getWriteAllCounters() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Limits limits() {
+ return null;
+ }
+
+ /**
+ * Returns size of a group.
+ *
+ * @param grpName Name of the group.
+ * @return amount of counters in the given group.
+ */
+ public int groupSize(String grpName) {
+ int res = 0;
+
+ for (HadoopCounter counter : cntrs.values()) {
+ if (grpName.equals(counter.group()))
+ res++;
+ }
+
+ return res;
+ }
+
+ /**
+ * Returns counters iterator for specified group.
+ *
+ * @param grpName Name of the group to iterate.
+ * @return Counters iterator.
+ */
+ public Iterator<Counter> iterateGroup(String grpName) {
+ Collection<Counter> grpCounters = new ArrayList<>();
+
+ for (HadoopLongCounter counter : cntrs.values()) {
+ if (grpName.equals(counter.group()))
+ grpCounters.add(new HadoopV2Counter(counter));
+ }
+
+ return grpCounters.iterator();
+ }
+
+ /**
+ * Find a counter in the group.
+ *
+ * @param grpName The name of the counter group.
+ * @param cntrName The name of the counter.
+ * @param create Create the counter if not found if true.
+ * @return The counter that was found or added or {@code null} if create is false.
+ */
+ public Counter findCounter(String grpName, String cntrName, boolean create) {
+ T2<String, String> key = new T2<>(grpName, cntrName);
+
+ HadoopLongCounter internalCntr = cntrs.get(key);
+
+ if (internalCntr == null & create) {
+ internalCntr = new HadoopLongCounter(grpName,cntrName);
+
+ cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
+ }
+
+ return internalCntr == null ? null : new HadoopV2Counter(internalCntr);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java
new file mode 100644
index 0000000..f4c1ebf
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.processors.hadoop.HadoopAttributes;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopClasspathUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
+import org.apache.ignite.internal.processors.hadoop.HadoopLocations;
+import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopEmbeddedTaskExecutor;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Hadoop processor.
+ */
+public class HadoopProcessor extends HadoopProcessorAdapter {
+ /** Job ID counter. */
+ private final AtomicInteger idCtr = new AtomicInteger();
+
+ /** Hadoop context. */
+ @GridToStringExclude
+ private HadoopContext hctx;
+
+ /** Hadoop facade for public API. */
+ @GridToStringExclude
+ private Hadoop hadoop;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Kernal context.
+ */
+ public HadoopProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (ctx.isDaemon())
+ return;
+
+ HadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
+
+ if (cfg == null)
+ cfg = new HadoopConfiguration();
+ else
+ cfg = new HadoopConfiguration(cfg);
+
+ initializeDefaults(cfg);
+
+ hctx = new HadoopContext(
+ ctx,
+ cfg,
+ new HadoopJobTracker(),
+ new HadoopEmbeddedTaskExecutor(),
+ // TODO: IGNITE-404: Uncomment when fixed.
+ //cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(),
+ new HadoopShuffle());
+
+ for (HadoopComponent c : hctx.components())
+ c.start(hctx);
+
+ hadoop = new HadoopImpl(this);
+
+ ctx.addNodeAttribute(HadoopAttributes.NAME, new HadoopAttributes(cfg));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
+
+ if (hctx == null)
+ return;
+
+ for (HadoopComponent c : hctx.components())
+ c.onKernalStart();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ super.onKernalStop(cancel);
+
+ if (hctx == null)
+ return;
+
+ List<HadoopComponent> components = hctx.components();
+
+ for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
+ HadoopComponent c = it.previous();
+
+ c.onKernalStop(cancel);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ super.stop(cancel);
+
+ if (hctx == null)
+ return;
+
+ List<HadoopComponent> components = hctx.components();
+
+ for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
+ HadoopComponent c = it.previous();
+
+ c.stop(cancel);
+ }
+ }
+
+ /**
+ * Gets Hadoop context.
+ *
+ * @return Hadoop context.
+ */
+ public HadoopContext context() {
+ return hctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Hadoop hadoop() {
+ if (hadoop == null)
+ throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " +
+ "is HADOOP_HOME environment variable set?)");
+
+ return hadoop;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration config() {
+ return hctx.configuration();
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobId nextJobId() {
+ return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
+ return hctx.jobTracker().submit(jobId, jobInfo);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().status(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().jobCounters(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().finishFuture(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().killJob(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateEnvironment() throws IgniteCheckedException {
+ // Perform some static checks as early as possible, so that any recoverable exceptions are thrown here.
+ try {
+ HadoopLocations loc = HadoopClasspathUtils.locations();
+
+ if (!F.isEmpty(loc.home()))
+ U.quietAndInfo(log, HadoopClasspathUtils.HOME + " is set to " + loc.home());
+
+ U.quietAndInfo(log, "Resolved Hadoop classpath locations: " + loc.common() + ", " + loc.hdfs() + ", " +
+ loc.mapred());
+ }
+ catch (IOException ioe) {
+ throw new IgniteCheckedException(ioe.getMessage(), ioe);
+ }
+
+ HadoopClassLoader.hadoopUrls();
+ }
+
+ /**
+ * Initializes default hadoop configuration.
+ *
+ * @param cfg Hadoop configuration.
+ */
+ private void initializeDefaults(HadoopConfiguration cfg) {
+ if (cfg.getMapReducePlanner() == null)
+ cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProcessor.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java
new file mode 100644
index 0000000..f62c999
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Scanner;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+
+/**
+ * Setup tool to configure Hadoop client.
+ */
+public class HadoopSetup {
+ /** */
+ public static final String WINUTILS_EXE = "winutils.exe";
+
+ /** */
+ private static final FilenameFilter IGNITE_JARS = new FilenameFilter() {
+ @Override public boolean accept(File dir, String name) {
+ return name.startsWith("ignite-") && name.endsWith(".jar");
+ }
+ };
+
+ /**
+ * The main method.
+ * @param ignore Params.
+ */
+ public static void main(String[] ignore) {
+ X.println(
+ " __________ ________________ ",
+ " / _/ ___/ |/ / _/_ __/ __/ ",
+ " _/ // (7 7 // / / / / _/ ",
+ "/___/\\___/_/|_/___/ /_/ /___/ ",
+ " for Apache Hadoop ",
+ " ",
+ "ver. " + ACK_VER_STR,
+ COPYRIGHT);
+
+ configureHadoop();
+ }
+
+ /**
+ * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop.
+ * It performs these operations:
+ * <ul>
+ * <li>Check for setting of HADOOP_HOME environment variable.</li>
+ * <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.</li>
+ * <li>In Windows check if winutils.exe exists and try to fix issue with some restrictions.</li>
+ * <li>In Windows check new line character issues in CMD scripts.</li>
+ * <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.</li>
+ * </ul>
+ */
+ private static void configureHadoop() {
+ String igniteHome = U.getIgniteHome();
+
+ println("IGNITE_HOME is set to '" + igniteHome + "'.");
+
+ checkIgniteHome(igniteHome);
+
+ String homeVar = "HADOOP_HOME";
+ String hadoopHome = System.getenv(homeVar);
+
+ if (F.isEmpty(hadoopHome)) {
+ homeVar = "HADOOP_PREFIX";
+ hadoopHome = System.getenv(homeVar);
+ }
+
+ if (F.isEmpty(hadoopHome))
+ exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " +
+ "valid Hadoop installation directory and run setup tool again.", null);
+
+ hadoopHome = hadoopHome.replaceAll("\"", "");
+
+ println(homeVar + " is set to '" + hadoopHome + "'.");
+
+ String hiveHome = System.getenv("HIVE_HOME");
+
+ if (!F.isEmpty(hiveHome)) {
+ hiveHome = hiveHome.replaceAll("\"", "");
+
+ println("HIVE_HOME is set to '" + hiveHome + "'.");
+ }
+
+ File hadoopDir = new File(hadoopHome);
+
+ if (!hadoopDir.exists())
+ exit("Hadoop installation folder does not exist.", null);
+
+ if (!hadoopDir.isDirectory())
+ exit("HADOOP_HOME must point to a directory.", null);
+
+ if (!hadoopDir.canRead())
+ exit("Hadoop installation folder can not be read. Please check permissions.", null);
+
+ final File hadoopCommonDir;
+
+ String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME");
+
+ if (F.isEmpty(hadoopCommonHome)) {
+ hadoopCommonDir = new File(hadoopDir, "share/hadoop/common");
+
+ println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'.");
+ }
+ else {
+ println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'.");
+
+ hadoopCommonDir = new File(hadoopCommonHome);
+ }
+
+ if (!hadoopCommonDir.canRead())
+ exit("Failed to read Hadoop common dir '" + hadoopCommonDir + "'.", null);
+
+ final File hadoopCommonLibDir = new File(hadoopCommonDir, "lib");
+
+ if (!hadoopCommonLibDir.canRead())
+ exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null);
+
+ if (U.isWindows()) {
+ checkJavaPathSpaces();
+
+ final File hadoopBinDir = new File(hadoopDir, "bin");
+
+ if (!hadoopBinDir.canRead())
+ exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null);
+
+ File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE);
+
+ if (!winutilsFile.exists()) {
+ if (ask("File '" + WINUTILS_EXE + "' does not exist. " +
+ "It may be replaced by a stub. Create it?")) {
+ println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'.");
+
+ boolean ok = false;
+
+ try {
+ ok = winutilsFile.createNewFile();
+ }
+ catch (IOException ignore) {
+ // No-op.
+ }
+
+ if (!ok)
+ exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null);
+ }
+ else
+ println("Ok. But Hadoop client probably will not work on Windows this way...");
+ }
+
+ processCmdFiles(hadoopDir, "bin", "sbin", "libexec");
+ }
+
+ File igniteLibs = new File(new File(igniteHome), "libs");
+
+ if (!igniteLibs.exists())
+ exit("Ignite 'libs' folder is not found.", null);
+
+ Collection<File> jarFiles = new ArrayList<>();
+
+ addJarsInFolder(jarFiles, igniteLibs);
+ addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop"));
+ addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop-impl"));
+
+ boolean jarsLinksCorrect = true;
+
+ for (File file : jarFiles) {
+ File link = new File(hadoopCommonLibDir, file.getName());
+
+ jarsLinksCorrect &= isJarLinkCorrect(link, file);
+
+ if (!jarsLinksCorrect)
+ break;
+ }
+
+ if (!jarsLinksCorrect) {
+ if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " +
+ "Create appropriate symbolic links?")) {
+ File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS);
+
+ if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " +
+ "installation. They must be deleted to continue. Continue?")) {
+ for (File file : oldIgniteJarFiles) {
+ println("Deleting file '" + file.getAbsolutePath() + "'.");
+
+ if (!file.delete())
+ exit("Failed to delete file '" + file.getPath() + "'.", null);
+ }
+ }
+
+ for (File file : jarFiles) {
+ File targetFile = new File(hadoopCommonLibDir, file.getName());
+
+ try {
+ println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'.");
+
+ Files.createSymbolicLink(targetFile.toPath(), file.toPath());
+ }
+ catch (IOException e) {
+ if (U.isWindows()) {
+ warn("Ability to create symbolic links is required!");
+ warn("On Windows platform you have to grant permission 'Create symbolic links'");
+ warn("to your user or run the Accelerator as Administrator.");
+ }
+
+ exit("Creating symbolic link failed! Check permissions.", e);
+ }
+ }
+ }
+ else
+ println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath...");
+ }
+
+ File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop");
+
+ File igniteHadoopCfg = igniteHadoopConfig(igniteHome);
+
+ if (!igniteHadoopCfg.canRead())
+ exit("Failed to read Ignite Hadoop 'config' folder at '" + igniteHadoopCfg.getAbsolutePath() + "'.", null);
+
+ if (hadoopEtc.canWrite()) { // TODO Bigtop
+ if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " +
+ "(existing files will be backed up)?")) {
+ replaceWithBackup(new File(igniteHadoopCfg, "core-site.ignite.xml"),
+ new File(hadoopEtc, "core-site.xml"));
+
+ replaceWithBackup(new File(igniteHadoopCfg, "mapred-site.ignite.xml"),
+ new File(hadoopEtc, "mapred-site.xml"));
+ }
+ else
+ println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory...");
+ }
+
+ if (!F.isEmpty(hiveHome)) {
+ File hiveConfDir = new File(hiveHome + File.separator + "conf");
+
+ if (!hiveConfDir.canWrite())
+ warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " +
+ "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory.");
+ else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?"))
+ replaceWithBackup(new File(igniteHadoopCfg, "hive-site.ignite.xml"),
+ new File(hiveConfDir, "hive-site.xml"));
+ else
+ println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory...");
+ }
+
+ println("Apache Hadoop setup is complete.");
+ }
+
+ /**
+ * Get Ignite Hadoop config directory.
+ *
+ * @param igniteHome Ignite home.
+ * @return Ignite Hadoop config directory.
+ */
+ private static File igniteHadoopConfig(String igniteHome) {
+ Path path = Paths.get(igniteHome, "modules", "hadoop", "config");
+
+ if (!Files.exists(path))
+ path = Paths.get(igniteHome, "config", "hadoop");
+
+ if (Files.exists(path))
+ return path.toFile();
+ else
+ return new File(igniteHome, "docs");
+ }
+
+ /**
+ * @param jarFiles Jars.
+ * @param folder Folder.
+ */
+ private static void addJarsInFolder(Collection<File> jarFiles, File folder) {
+ if (!folder.exists())
+ exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null);
+
+ jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS)));
+ }
+
+ /**
+ * Checks that JAVA_HOME does not contain space characters.
+ */
+ private static void checkJavaPathSpaces() {
+ String javaHome = System.getProperty("java.home");
+
+ if (javaHome.contains(" ")) {
+ warn("Java installation path contains space characters!");
+ warn("Hadoop client will not be able to start using '" + javaHome + "'.");
+ warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation.");
+ }
+ }
+
+ /**
+ * Checks Ignite home.
+ *
+ * @param igniteHome Ignite home.
+ */
+ private static void checkIgniteHome(String igniteHome) {
+ URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation();
+
+ try {
+ Path jar = Paths.get(jarUrl.toURI());
+ Path igHome = Paths.get(igniteHome);
+
+ if (!jar.startsWith(igHome))
+ exit("Ignite JAR files are not under IGNITE_HOME.", null);
+ }
+ catch (Exception e) {
+ exit(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Replaces target file with source file.
+ *
+ * @param from From.
+ * @param to To.
+ */
+ private static void replaceWithBackup(File from, File to) {
+ if (!from.canRead())
+ exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null);
+
+ println("Replacing file '" + to.getAbsolutePath() + "'.");
+
+ try {
+ U.copy(from, renameToBak(to), true);
+ }
+ catch (IOException e) {
+ exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e);
+ }
+ }
+
+ /**
+ * Renames file for backup.
+ *
+ * @param file File.
+ * @return File.
+ */
+ private static File renameToBak(File file) {
+ DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
+
+ if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak")))
+ exit("Failed to rename file '" + file.getPath() + "'.", null);
+
+ return file;
+ }
+
+ /**
+ * Checks if link is correct.
+ *
+ * @param link Symbolic link.
+ * @param correctTarget Correct link target.
+ * @return {@code true} If link target is correct.
+ */
+ private static boolean isJarLinkCorrect(File link, File correctTarget) {
+ if (!Files.isSymbolicLink(link.toPath()))
+ return false; // It is a real file or it does not exist.
+
+ Path target = null;
+
+ try {
+ target = Files.readSymbolicLink(link.toPath());
+ }
+ catch (IOException e) {
+ exit("Failed to read symbolic link: " + link.getAbsolutePath(), e);
+ }
+
+ return Files.exists(target) && target.toFile().equals(correctTarget);
+ }
+
+ /**
+ * Writes the question end read the boolean answer from the console.
+ *
+ * @param question Question to write.
+ * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise.
+ */
+ private static boolean ask(String question) {
+ X.println();
+ X.print(" < " + question + " (Y/N): ");
+
+ String answer = null;
+
+ if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES")))
+ answer = "Y";
+ else {
+ BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
+
+ try {
+ answer = br.readLine();
+ }
+ catch (IOException e) {
+ exit("Failed to read answer: " + e.getMessage(), e);
+ }
+ }
+
+ if (answer != null && "Y".equals(answer.toUpperCase().trim())) {
+ X.println(" > Yes.");
+
+ return true;
+ }
+ else {
+ X.println(" > No.");
+
+ return false;
+ }
+ }
+
+ /**
+ * Exit with message.
+ *
+ * @param msg Exit message.
+ */
+ private static void exit(String msg, Exception e) {
+ X.println(" ");
+ X.println(" # " + msg);
+ X.println(" # Setup failed, exiting... ");
+
+ if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG")))
+ e.printStackTrace();
+
+ System.exit(1);
+ }
+
+ /**
+ * Prints message.
+ *
+ * @param msg Message.
+ */
+ private static void println(String msg) {
+ X.println(" > " + msg);
+ }
+
+ /**
+ * Prints warning.
+ *
+ * @param msg Message.
+ */
+ private static void warn(String msg) {
+ X.println(" ! " + msg);
+ }
+
+ /**
+ * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the
+ * answer. If it's 'Y' then backups original files and corrects invalid new line characters.
+ *
+ * @param rootDir Root directory to process.
+ * @param dirs Directories inside of the root to process.
+ */
+ private static void processCmdFiles(File rootDir, String... dirs) {
+ boolean answer = false;
+
+ for (String dir : dirs) {
+ File subDir = new File(rootDir, dir);
+
+ File[] cmdFiles = subDir.listFiles(new FilenameFilter() {
+ @Override public boolean accept(File dir, String name) {
+ return name.toLowerCase().endsWith(".cmd");
+ }
+ });
+
+ for (File file : cmdFiles) {
+ String content = null;
+
+ try (Scanner scanner = new Scanner(file)) {
+ content = scanner.useDelimiter("\\Z").next();
+ }
+ catch (FileNotFoundException e) {
+ exit("Failed to read file '" + file + "'.", e);
+ }
+
+ boolean invalid = false;
+
+ for (int i = 0; i < content.length(); i++) {
+ if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) {
+ invalid = true;
+
+ break;
+ }
+ }
+
+ if (invalid) {
+ answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?");
+
+ if (!answer) {
+ println("Ok. But Windows most probably will fail to execute them...");
+
+ return;
+ }
+
+ println("Fixing newline characters in file '" + file.getAbsolutePath() + "'.");
+
+ renameToBak(file);
+
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
+ for (int i = 0; i < content.length(); i++) {
+ if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r'))
+ writer.write("\r");
+
+ writer.write(content.charAt(i));
+ }
+ }
+ catch (IOException e) {
+ exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskCancelledException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskCancelledException.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskCancelledException.java
new file mode 100644
index 0000000..85df551
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskCancelledException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.IgniteException;
+
+/**
+ * Exception that throws when the task is cancelling.
+ */
+public class HadoopTaskCancelledException extends IgniteException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param msg Exception message.
+ */
+ public HadoopTaskCancelledException(String msg) {
+ super(msg);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
new file mode 100644
index 0000000..ef5841d
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop utility methods.
+ */
+public class HadoopUtils {
+ /** Property to store timestamp of new job id request. */
+ public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs";
+
+ /** Property to store timestamp of response of new job id request. */
+ public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs";
+
+ /** Property to store timestamp of job submission. */
+ public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs";
+
+ /** Property to set custom writer of job statistics. */
+ public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer";
+
+ /** Staging constant. */
+ private static final String STAGING_CONSTANT = ".staging";
+
+ /** Old mapper class attribute. */
+ private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class";
+
+ /** Old reducer class attribute. */
+ private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
+
+ /**
+ * Constructor.
+ */
+ private HadoopUtils() {
+ // No-op.
+ }
+
+ /**
+ * Wraps native split.
+ *
+ * @param id Split ID.
+ * @param split Split.
+ * @param hosts Hosts.
+ * @throws IOException If failed.
+ */
+ public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException {
+ ByteArrayOutputStream arr = new ByteArrayOutputStream();
+ ObjectOutput out = new ObjectOutputStream(arr);
+
+ assert split instanceof Writable;
+
+ ((Writable)split).write(out);
+
+ out.flush();
+
+ return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts);
+ }
+
+ /**
+ * Unwraps native split.
+ *
+ * @param o Wrapper.
+ * @return Split.
+ */
+ public static Object unwrapSplit(HadoopSplitWrapper o) {
+ try {
+ Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance();
+
+ w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes())));
+
+ return w;
+ }
+ catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Convert Ignite job status to Hadoop job status.
+ *
+ * @param status Ignite job status.
+ * @return Hadoop job status.
+ */
+ public static JobStatus status(HadoopJobStatus status, Configuration conf) {
+ JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId());
+
+ float setupProgress = 0;
+ float mapProgress = 0;
+ float reduceProgress = 0;
+ float cleanupProgress = 0;
+
+ JobStatus.State state = JobStatus.State.RUNNING;
+
+ switch (status.jobPhase()) {
+ case PHASE_SETUP:
+ setupProgress = 0.42f;
+
+ break;
+
+ case PHASE_MAP:
+ setupProgress = 1;
+ mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt();
+
+ break;
+
+ case PHASE_REDUCE:
+ setupProgress = 1;
+ mapProgress = 1;
+
+ if (status.totalReducerCnt() > 0)
+ reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+ else
+ reduceProgress = 1f;
+
+ break;
+
+ case PHASE_CANCELLING:
+ case PHASE_COMPLETE:
+ if (!status.isFailed()) {
+ setupProgress = 1;
+ mapProgress = 1;
+ reduceProgress = 1;
+ cleanupProgress = 1;
+
+ state = JobStatus.State.SUCCEEDED;
+ }
+ else
+ state = JobStatus.State.FAILED;
+
+ break;
+
+ default:
+ assert false;
+ }
+
+ return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state,
+ JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A");
+ }
+
+ /**
+ * Gets staging area directory.
+ *
+ * @param conf Configuration.
+ * @param usr User.
+ * @return Staging area directory.
+ */
+ public static Path stagingAreaDir(Configuration conf, String usr) {
+ return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+ + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT);
+ }
+
+ /**
+ * Gets job file.
+ *
+ * @param conf Configuration.
+ * @param usr User.
+ * @param jobId Job ID.
+ * @return Job file.
+ */
+ public static Path jobFile(Configuration conf, String usr, JobID jobId) {
+ return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
+ }
+
+ /**
+ * Checks the attribute in configuration is not set.
+ *
+ * @param attr Attribute name.
+ * @param msg Message for creation of exception.
+ * @throws IgniteCheckedException If attribute is set.
+ */
+ public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException {
+ if (cfg.get(attr) != null)
+ throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode.");
+ }
+
+ /**
+ * Creates JobInfo from hadoop configuration.
+ *
+ * @param cfg Hadoop configuration.
+ * @return Job info.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException {
+ JobConf jobConf = new JobConf(cfg);
+
+ boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
+ || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null;
+
+ int numReduces = jobConf.getNumReduceTasks();
+
+ jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null);
+
+ if (jobConf.getUseNewMapper()) {
+ String mode = "new map API";
+
+ ensureNotSet(jobConf, "mapred.input.format.class", mode);
+ ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode);
+
+ if (numReduces != 0)
+ ensureNotSet(jobConf, "mapred.partitioner.class", mode);
+ else
+ ensureNotSet(jobConf, "mapred.output.format.class", mode);
+ }
+ else {
+ String mode = "map compatibility";
+
+ ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
+ ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode);
+
+ if (numReduces != 0)
+ ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode);
+ else
+ ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
+ }
+
+ if (numReduces != 0) {
+ jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null);
+
+ if (jobConf.getUseNewReducer()) {
+ String mode = "new reduce API";
+
+ ensureNotSet(jobConf, "mapred.output.format.class", mode);
+ ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode);
+ }
+ else {
+ String mode = "reduce compatibility";
+
+ ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
+ ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
+ }
+ }
+
+ Map<String, String> props = new HashMap<>();
+
+ for (Map.Entry<String, String> entry : jobConf)
+ props.put(entry.getKey(), entry.getValue());
+
+ return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props);
+ }
+
+ /**
+ * Throws new {@link IgniteCheckedException} with original exception is serialized into string.
+ * This is needed to transfer error outside the current class loader.
+ *
+ * @param e Original exception.
+ * @return IgniteCheckedException New exception.
+ */
+ public static IgniteCheckedException transformException(Throwable e) {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+
+ e.printStackTrace(new PrintStream(os, true));
+
+ return new IgniteCheckedException(os.toString());
+ }
+
+ /**
+ * Returns work directory for job execution.
+ *
+ * @param locNodeId Local node ID.
+ * @param jobId Job ID.
+ * @return Working directory for job.
+ * @throws IgniteCheckedException If Failed.
+ */
+ public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException {
+ return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId);
+ }
+
+ /**
+ * Returns subdirectory of job working directory for task execution.
+ *
+ * @param locNodeId Local node ID.
+ * @param info Task info.
+ * @return Working directory for task.
+ * @throws IgniteCheckedException If Failed.
+ */
+ public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException {
+ File jobLocDir = jobLocalDir(locNodeId, info.jobId());
+
+ return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt());
+ }
+
+ /**
+ * Creates {@link Configuration} in a correct class loader context to avoid caching
+ * of inappropriate class loader in the Configuration object.
+ * @return New instance of {@link Configuration}.
+ */
+ public static Configuration safeCreateConfiguration() {
+ final ClassLoader oldLdr = setContextClassLoader(Configuration.class.getClassLoader());
+
+ try {
+ return new Configuration();
+ }
+ finally {
+ restoreContextClassLoader(oldLdr);
+ }
+ }
+
+
+
+ /**
+ * Set context class loader.
+ *
+ * @param newLdr New class loader.
+ * @return Old class loader.
+ */
+ @Nullable public static ClassLoader setContextClassLoader(@Nullable ClassLoader newLdr) {
+ ClassLoader oldLdr = Thread.currentThread().getContextClassLoader();
+
+ if (newLdr != oldLdr)
+ Thread.currentThread().setContextClassLoader(newLdr);
+
+ return oldLdr;
+ }
+
+ /**
+ * Restore context class loader.
+ *
+ * @param oldLdr Original class loader.
+ */
+ public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) {
+ ClassLoader newLdr = Thread.currentThread().getContextClassLoader();
+
+ if (newLdr != oldLdr)
+ Thread.currentThread().setContextClassLoader(oldLdr);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopCounterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopCounterAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopCounterAdapter.java
new file mode 100644
index 0000000..3f682d3
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopCounterAdapter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Default Hadoop counter implementation.
+ */
+public abstract class HadoopCounterAdapter implements HadoopCounter, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Counter group name. */
+ private String grp;
+
+ /** Counter name. */
+ private String name;
+
+ /**
+ * Default constructor required by {@link Externalizable}.
+ */
+ protected HadoopCounterAdapter() {
+ // No-op.
+ }
+
+ /**
+ * Creates new counter with given group and name.
+ *
+ * @param grp Counter group name.
+ * @param name Counter name.
+ */
+ protected HadoopCounterAdapter(String grp, String name) {
+ assert grp != null : "counter must have group";
+ assert name != null : "counter must have name";
+
+ this.grp = grp;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public String group() {
+ return grp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(grp);
+ out.writeUTF(name);
+ writeValue(out);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ grp = in.readUTF();
+ name = in.readUTF();
+ readValue(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ HadoopCounterAdapter cntr = (HadoopCounterAdapter)o;
+
+ if (!grp.equals(cntr.grp))
+ return false;
+ if (!name.equals(cntr.name))
+ return false;
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = grp.hashCode();
+ res = 31 * res + name.hashCode();
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopCounterAdapter.class, this);
+ }
+
+ /**
+ * Writes value of this counter to output.
+ *
+ * @param out Output.
+ * @throws IOException If failed.
+ */
+ protected abstract void writeValue(ObjectOutput out) throws IOException;
+
+ /**
+ * Read value of this counter from input.
+ *
+ * @param in Input.
+ * @throws IOException If failed.
+ */
+ protected abstract void readValue(ObjectInput in) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopCountersImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopCountersImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopCountersImpl.java
new file mode 100644
index 0000000..f3b5463
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopCountersImpl.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ * Default in-memory counters store.
+ */
+public class HadoopCountersImpl implements HadoopCounters, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final ConcurrentMap<CounterKey, HadoopCounter> cntrsMap = new ConcurrentHashMap8<>();
+
+ /**
+ * Default constructor. Creates new instance without counters.
+ */
+ public HadoopCountersImpl() {
+ // No-op.
+ }
+
+ /**
+ * Creates new instance that contain given counters.
+ *
+ * @param cntrs Counters to store.
+ */
+ public HadoopCountersImpl(Iterable<HadoopCounter> cntrs) {
+ addCounters(cntrs, true);
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param cntrs Counters to copy.
+ */
+ public HadoopCountersImpl(HadoopCounters cntrs) {
+ this(cntrs.all());
+ }
+
+ /**
+ * Creates counter instance.
+ *
+ * @param cls Class of the counter.
+ * @param grp Group name.
+ * @param name Counter name.
+ * @return Counter.
+ */
+ private <T extends HadoopCounter> T createCounter(Class<? extends HadoopCounter> cls, String grp,
+ String name) {
+ try {
+ Constructor constructor = cls.getConstructor(String.class, String.class);
+
+ return (T)constructor.newInstance(grp, name);
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Adds counters collection in addition to existing counters.
+ *
+ * @param cntrs Counters to add.
+ * @param cp Whether to copy counters or not.
+ */
+ private void addCounters(Iterable<HadoopCounter> cntrs, boolean cp) {
+ assert cntrs != null;
+
+ for (HadoopCounter cntr : cntrs) {
+ if (cp) {
+ HadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name());
+
+ cntrCp.merge(cntr);
+
+ cntr = cntrCp;
+ }
+
+ cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
+ assert cls != null;
+
+ CounterKey mapKey = new CounterKey(cls, grp, name);
+
+ T cntr = (T)cntrsMap.get(mapKey);
+
+ if (cntr == null) {
+ cntr = createCounter(cls, grp, name);
+
+ T old = (T)cntrsMap.putIfAbsent(mapKey, cntr);
+
+ if (old != null)
+ return old;
+ }
+
+ return cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<HadoopCounter> all() {
+ return cntrsMap.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void merge(HadoopCounters other) {
+ for (HadoopCounter counter : other.all())
+ counter(counter.group(), counter.name(), counter.getClass()).merge(counter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeCollection(out, cntrsMap.values());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ addCounters(U.<HadoopCounter>readCollection(in), false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ HadoopCountersImpl counters = (HadoopCountersImpl)o;
+
+ return cntrsMap.equals(counters.cntrsMap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return cntrsMap.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopCountersImpl.class, this, "counters", cntrsMap.values());
+ }
+
+ /**
+ * The tuple of counter identifier components for more readable code.
+ */
+ private static class CounterKey extends GridTuple3<Class<? extends HadoopCounter>, String, String> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructor.
+ *
+ * @param cls Class of the counter.
+ * @param grp Group name.
+ * @param name Counter name.
+ */
+ private CounterKey(Class<? extends HadoopCounter> cls, String grp, String name) {
+ super(cls, grp, name);
+ }
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public CounterKey() {
+ // No-op.
+ }
+ }
+}
\ No newline at end of file