You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/03/05 10:06:26 UTC
[15/51] incubator-ignite git commit: IGNITE-386: Squashed changes.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
new file mode 100644
index 0000000..24f10a6
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.serializer.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
+import org.apache.ignite.internal.processors.hadoop.v1.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Context for task execution.
+ */
+public class HadoopV2TaskContext extends HadoopTaskContext {
+ /** */
+ private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
+
+ /**
+ * Check for combiner grouping support (available since Hadoop 2.3).
+ */
+ static {
+ boolean ok;
+
+ try {
+ JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator");
+
+ ok = true;
+ }
+ catch (NoSuchMethodException ignore) {
+ ok = false;
+ }
+
+ COMBINE_KEY_GROUPING_SUPPORTED = ok;
+ }
+
+ /** Flag is set if new context-object code is used for running the mapper. */
+ private final boolean useNewMapper;
+
+ /** Flag is set if new context-object code is used for running the reducer. */
+ private final boolean useNewReducer;
+
+ /** Flag is set if new context-object code is used for running the combiner. */
+ private final boolean useNewCombiner;
+
+ /** */
+ private final JobContextImpl jobCtx;
+
+ /** Set if task is to cancelling. */
+ private volatile boolean cancelled;
+
+ /** Current task. */
+ private volatile HadoopTask task;
+
+ /** Local node ID */
+ private UUID locNodeId;
+
+ /** Counters for task. */
+ private final HadoopCounters cntrs = new HadoopCountersImpl();
+
+ /**
+ * @param taskInfo Task info.
+ * @param job Job.
+ * @param jobId Job ID.
+ * @param locNodeId Local node ID.
+ * @param jobConfDataInput DataInput for read JobConf.
+ */
+ public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId,
+ @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
+ super(taskInfo, job);
+ this.locNodeId = locNodeId;
+
+ // Before create JobConf instance we should set new context class loader.
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+ try {
+ JobConf jobConf = new JobConf();
+
+ try {
+ jobConf.readFields(jobConfDataInput);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ // For map-reduce jobs prefer local writes.
+ jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
+
+ jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId()));
+
+ useNewMapper = jobConf.getUseNewMapper();
+ useNewReducer = jobConf.getUseNewReducer();
+ useNewCombiner = jobConf.getCombinerClass() == null;
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(null);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
+ return cntrs.counter(grp, name, cls);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopCounters counters() {
+ return cntrs;
+ }
+
+ /**
+ * Creates appropriate task from current task info.
+ *
+ * @return Task.
+ */
+ private HadoopTask createTask() {
+ boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT;
+
+ switch (taskInfo().type()) {
+ case SETUP:
+ return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new HadoopV1SetupTask(taskInfo());
+
+ case MAP:
+ return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new HadoopV1MapTask(taskInfo());
+
+ case REDUCE:
+ return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), true) :
+ new HadoopV1ReduceTask(taskInfo(), true);
+
+ case COMBINE:
+ return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), false) :
+ new HadoopV1ReduceTask(taskInfo(), false);
+
+ case COMMIT:
+ case ABORT:
+ return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), isAbort) :
+ new HadoopV1CleanupTask(taskInfo(), isAbort);
+
+ default:
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() throws IgniteCheckedException {
+ try {
+ Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
+
+ try {
+ task = createTask();
+ }
+ catch (Throwable e) {
+ throw transformException(e);
+ }
+
+ if (cancelled)
+ throw new HadoopTaskCancelledException("Task cancelled.");
+
+ try {
+ task.run(this);
+ }
+ catch (Throwable e) {
+ throw transformException(e);
+ }
+ }
+ finally {
+ task = null;
+
+ Thread.currentThread().setContextClassLoader(null);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ cancelled = true;
+
+ HadoopTask t = task;
+
+ if (t != null)
+ t.cancel();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
+ File locDir;
+
+ switch(taskInfo().type()) {
+ case MAP:
+ case REDUCE:
+ job().prepareTaskEnvironment(taskInfo());
+
+ locDir = taskLocalDir(locNodeId, taskInfo());
+
+ break;
+
+ default:
+ locDir = jobLocalDir(locNodeId, taskInfo().jobId());
+ }
+
+ Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
+
+ try {
+ FileSystem fs = FileSystem.get(jobConf());
+
+ HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
+
+ LocalFileSystem locFs = FileSystem.getLocal(jobConf());
+
+ locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
+ }
+ catch (Throwable e) {
+ throw transformException(e);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(null);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
+ job().cleanupTaskEnvironment(taskInfo());
+ }
+
+ /**
+ * Creates Hadoop attempt ID.
+ *
+ * @return Attempt ID.
+ */
+ public TaskAttemptID attemptId() {
+ TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber());
+
+ return new TaskAttemptID(tid, taskInfo().attempt());
+ }
+
+ /**
+ * @param type Task type.
+ * @return Hadoop task type.
+ */
+ private TaskType taskType(HadoopTaskType type) {
+ switch (type) {
+ case SETUP:
+ return TaskType.JOB_SETUP;
+ case MAP:
+ case COMBINE:
+ return TaskType.MAP;
+
+ case REDUCE:
+ return TaskType.REDUCE;
+
+ case COMMIT:
+ case ABORT:
+ return TaskType.JOB_CLEANUP;
+
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Gets job configuration of the task.
+ *
+ * @return Job configuration.
+ */
+ public JobConf jobConf() {
+ return jobCtx.getJobConf();
+ }
+
+ /**
+ * Gets job context of the task.
+ *
+ * @return Job context.
+ */
+ public JobContextImpl jobContext() {
+ return jobCtx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopPartitioner partitioner() throws IgniteCheckedException {
+ Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null);
+
+ if (partClsOld != null)
+ return new HadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf());
+
+ try {
+ return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf());
+ }
+ catch (ClassNotFoundException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * Gets serializer for specified class.
+ *
+ * @param cls Class.
+ * @param jobConf Job configuration.
+ * @return Appropriate serializer.
+ */
+ @SuppressWarnings("unchecked")
+ private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
+ A.notNull(cls, "cls");
+
+ SerializationFactory factory = new SerializationFactory(jobConf);
+
+ Serialization<?> serialization = factory.getSerialization(cls);
+
+ if (serialization == null)
+ throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());
+
+ if (serialization.getClass() == WritableSerialization.class)
+ return new HadoopWritableSerialization((Class<? extends Writable>)cls);
+
+ return new HadoopSerializationWrapper(serialization, cls);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopSerialization keySerialization() throws IgniteCheckedException {
+ return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf());
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException {
+ return getSerialization(jobCtx.getMapOutputValueClass(), jobConf());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Comparator<Object> sortComparator() {
+ return (Comparator<Object>)jobCtx.getSortComparator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Comparator<Object> groupComparator() {
+ Comparator<?> res;
+
+ switch (taskInfo().type()) {
+ case COMBINE:
+ res = COMBINE_KEY_GROUPING_SUPPORTED ?
+ jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator();
+
+ break;
+
+ case REDUCE:
+ res = jobContext().getGroupingComparator();
+
+ break;
+
+ default:
+ return null;
+ }
+
+ if (res != null && res.getClass() != sortComparator().getClass())
+ return (Comparator<Object>)res;
+
+ return null;
+ }
+
+ /**
+ * @param split Split.
+ * @return Native Hadoop split.
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings("unchecked")
+ public Object getNativeSplit(HadoopInputSplit split) throws IgniteCheckedException {
+ if (split instanceof HadoopExternalSplit)
+ return readExternalSplit((HadoopExternalSplit)split);
+
+ if (split instanceof HadoopSplitWrapper)
+ return unwrapSplit((HadoopSplitWrapper)split);
+
+ throw new IllegalStateException("Unknown split: " + split);
+ }
+
+ /**
+ * @param split External split.
+ * @return Native input split.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
+ Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
+
+ try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf());
+ FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
+
+ in.seek(split.offset());
+
+ String clsName = Text.readString(in);
+
+ Class<?> cls = jobConf().getClassByName(clsName);
+
+ assert cls != null;
+
+ Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls);
+
+ Deserializer deserializer = serialization.getDeserializer(cls);
+
+ deserializer.open(in);
+
+ Object res = deserializer.deserialize(null);
+
+ deserializer.close();
+
+ assert res != null;
+
+ return res;
+ }
+ catch (IOException | ClassNotFoundException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
new file mode 100644
index 0000000..3920dd5
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.io.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Optimized serialization for Hadoop {@link Writable} types.
+ */
+public class HadoopWritableSerialization implements HadoopSerialization {
+ /** */
+ private final Class<? extends Writable> cls;
+
+ /**
+ * @param cls Class.
+ */
+ public HadoopWritableSerialization(Class<? extends Writable> cls) {
+ assert cls != null;
+
+ this.cls = cls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
+ assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass();
+
+ try {
+ ((Writable)obj).write(out);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
+ Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj);
+
+ try {
+ w.readFields(in);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ return w;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
index fe35d5e..8d5957b 100644
--- a/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
+++ b/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -1 +1 @@
-org.apache.ignite.client.hadoop.GridHadoopClientProtocolProvider
+org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java
deleted file mode 100644
index 780ce67..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.hadoop;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-
-/**
- * Hadoop client protocol tests in embedded process mode.
- */
-public class GridHadoopClientProtocolEmbeddedSelfTest extends GridHadoopClientProtocolSelfTest {
- /** {@inheritDoc} */
- @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) {
- GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
- cfg.setExternalExecution(false);
-
- return cfg;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java
deleted file mode 100644
index ff8798b..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.hadoop;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.*;
-import org.apache.hadoop.mapreduce.lib.output.*;
-import org.apache.hadoop.mapreduce.protocol.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.testframework.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop client protocol tests in external process mode.
- */
-@SuppressWarnings("ResultOfMethodCallIgnored")
-public class GridHadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest {
- /** Input path. */
- private static final String PATH_INPUT = "/input";
-
- /** Output path. */
- private static final String PATH_OUTPUT = "/output";
-
- /** Job name. */
- private static final String JOB_NAME = "myJob";
-
- /** Setup lock file. */
- private static File setupLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
- "ignite-lock-setup.file");
-
- /** Map lock file. */
- private static File mapLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
- "ignite-lock-map.file");
-
- /** Reduce lock file. */
- private static File reduceLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
- "ignite-lock-reduce.file");
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 2;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean igfsEnabled() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean restEnabled() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- startGrids(gridCount());
-
- setupLockFile.delete();
- mapLockFile.delete();
- reduceLockFile.delete();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
-
- super.afterTestsStopped();
-
-// GridHadoopClientProtocolProvider.cliMap.clear();
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- setupLockFile.createNewFile();
- mapLockFile.createNewFile();
- reduceLockFile.createNewFile();
-
- setupLockFile.deleteOnExit();
- mapLockFile.deleteOnExit();
- reduceLockFile.deleteOnExit();
-
- super.beforeTest();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName).format();
-
- setupLockFile.delete();
- mapLockFile.delete();
- reduceLockFile.delete();
-
- super.afterTest();
- }
-
- /**
- * Test next job ID generation.
- *
- * @throws Exception If failed.
- */
- @SuppressWarnings("ConstantConditions")
- private void tstNextJobId() throws Exception {
- GridHadoopClientProtocolProvider provider = provider();
-
- ClientProtocol proto = provider.create(config(GridHadoopAbstractSelfTest.REST_PORT));
-
- JobID jobId = proto.getNewJobID();
-
- assert jobId != null;
- assert jobId.getJtIdentifier() != null;
-
- JobID nextJobId = proto.getNewJobID();
-
- assert nextJobId != null;
- assert nextJobId.getJtIdentifier() != null;
-
- assert !F.eq(jobId, nextJobId);
- }
-
- /**
- * Tests job counters retrieval.
- *
- * @throws Exception If failed.
- */
- public void testJobCounters() throws Exception {
- IgniteFs igfs = grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName);
-
- igfs.mkdirs(new IgfsPath(PATH_INPUT));
-
- try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
- new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
-
- bw.write(
- "alpha\n" +
- "beta\n" +
- "gamma\n" +
- "alpha\n" +
- "beta\n" +
- "gamma\n" +
- "alpha\n" +
- "beta\n" +
- "gamma\n"
- );
- }
-
- Configuration conf = config(GridHadoopAbstractSelfTest.REST_PORT);
-
- final Job job = Job.getInstance(conf);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setMapperClass(TestCountingMapper.class);
- job.setReducerClass(TestCountingReducer.class);
- job.setCombinerClass(TestCountingCombiner.class);
-
- FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
- FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
-
- job.submit();
-
- final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
-
- assertEquals(0, cntr.getValue());
-
- cntr.increment(10);
-
- assertEquals(10, cntr.getValue());
-
- // Transferring to map phase.
- setupLockFile.delete();
-
- // Transferring to reduce phase.
- mapLockFile.delete();
-
- job.waitForCompletion(false);
-
- assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
-
- final Counters counters = job.getCounters();
-
- assertNotNull("counters cannot be null", counters);
- assertEquals("wrong counters count", 3, counters.countCounters());
- assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
- assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
- assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
- }
-
- /**
- * Tests job counters retrieval for unknown job id.
- *
- * @throws Exception If failed.
- */
- private void tstUnknownJobCounters() throws Exception {
- GridHadoopClientProtocolProvider provider = provider();
-
- ClientProtocol proto = provider.create(config(GridHadoopAbstractSelfTest.REST_PORT));
-
- try {
- proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1));
- fail("exception must be thrown");
- }
- catch (Exception e) {
- assert e instanceof IOException : "wrong error has been thrown";
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- private void tstJobSubmitMap() throws Exception {
- checkJobSubmit(true, true);
- }
-
- /**
- * @throws Exception If failed.
- */
- private void tstJobSubmitMapCombine() throws Exception {
- checkJobSubmit(false, true);
- }
-
- /**
- * @throws Exception If failed.
- */
- private void tstJobSubmitMapReduce() throws Exception {
- checkJobSubmit(true, false);
- }
-
- /**
- * @throws Exception If failed.
- */
- private void tstJobSubmitMapCombineReduce() throws Exception {
- checkJobSubmit(false, false);
- }
-
- /**
- * Test job submission.
- *
- * @param noCombiners Whether there are no combiners.
- * @param noReducers Whether there are no reducers.
- * @throws Exception If failed.
- */
- public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception {
- IgniteFs igfs = grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName);
-
- igfs.mkdirs(new IgfsPath(PATH_INPUT));
-
- try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
- new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
-
- bw.write("word");
- }
-
- Configuration conf = config(GridHadoopAbstractSelfTest.REST_PORT);
-
- final Job job = Job.getInstance(conf);
-
- job.setJobName(JOB_NAME);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setMapperClass(TestMapper.class);
- job.setReducerClass(TestReducer.class);
-
- if (!noCombiners)
- job.setCombinerClass(TestCombiner.class);
-
- if (noReducers)
- job.setNumReduceTasks(0);
-
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TestOutputFormat.class);
-
- FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
- FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
-
- job.submit();
-
- JobID jobId = job.getJobID();
-
- // Setup phase.
- JobStatus jobStatus = job.getStatus();
- checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
- assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f;
- assert jobStatus.getMapProgress() == 0.0f;
- assert jobStatus.getReduceProgress() == 0.0f;
-
- U.sleep(2100);
-
- JobStatus recentJobStatus = job.getStatus();
-
- assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() :
- "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress();
-
- // Transferring to map phase.
- setupLockFile.delete();
-
- assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- try {
- return F.eq(1.0f, job.getStatus().getSetupProgress());
- }
- catch (Exception e) {
- throw new RuntimeException("Unexpected exception.", e);
- }
- }
- }, 5000L);
-
- // Map phase.
- jobStatus = job.getStatus();
- checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
- assert jobStatus.getSetupProgress() == 1.0f;
- assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f;
- assert jobStatus.getReduceProgress() == 0.0f;
-
- U.sleep(2100);
-
- recentJobStatus = job.getStatus();
-
- assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() :
- "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress();
-
- // Transferring to reduce phase.
- mapLockFile.delete();
-
- assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- try {
- return F.eq(1.0f, job.getStatus().getMapProgress());
- }
- catch (Exception e) {
- throw new RuntimeException("Unexpected exception.", e);
- }
- }
- }, 5000L);
-
- if (!noReducers) {
- // Reduce phase.
- jobStatus = job.getStatus();
- checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
- assert jobStatus.getSetupProgress() == 1.0f;
- assert jobStatus.getMapProgress() == 1.0f;
- assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f;
-
- // Ensure that reduces progress increases.
- U.sleep(2100);
-
- recentJobStatus = job.getStatus();
-
- assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() :
- "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress();
-
- reduceLockFile.delete();
- }
-
- job.waitForCompletion(false);
-
- jobStatus = job.getStatus();
- checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f);
- assert jobStatus.getSetupProgress() == 1.0f;
- assert jobStatus.getMapProgress() == 1.0f;
- assert jobStatus.getReduceProgress() == 1.0f;
-
- dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
- }
-
- /**
- * Dump IGFS content.
- *
- * @param igfs IGFS.
- * @param path Path.
- * @throws Exception If failed.
- */
- @SuppressWarnings("ConstantConditions")
- private static void dumpIgfs(IgniteFs igfs, IgfsPath path) throws Exception {
- IgfsFile file = igfs.info(path);
-
- assert file != null;
-
- System.out.println(file.path());
-
- if (file.isDirectory()) {
- for (IgfsPath child : igfs.listPaths(path))
- dumpIgfs(igfs, child);
- }
- else {
- try (BufferedReader br = new BufferedReader(new InputStreamReader(igfs.open(path)))) {
- String line = br.readLine();
-
- while (line != null) {
- System.out.println(line);
-
- line = br.readLine();
- }
- }
- }
- }
-
- /**
- * Check job status.
- *
- * @param status Job status.
- * @param expJobId Expected job ID.
- * @param expJobName Expected job name.
- * @param expState Expected state.
- * @param expCleanupProgress Expected cleanup progress.
- * @throws Exception If failed.
- */
- private static void checkJobStatus(JobStatus status, JobID expJobId, String expJobName,
- JobStatus.State expState, float expCleanupProgress) throws Exception {
- assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", actual=" + status.getJobID();
- assert F.eq(status.getJobName(), expJobName) : "Expected=" + expJobName + ", actual=" + status.getJobName();
- assert F.eq(status.getState(), expState) : "Expected=" + expState + ", actual=" + status.getState();
- assert F.eq(status.getCleanupProgress(), expCleanupProgress) :
- "Expected=" + expCleanupProgress + ", actual=" + status.getCleanupProgress();
- }
-
- /**
- * @return Configuration.
- */
- private Configuration config(int port) {
- Configuration conf = new Configuration();
-
- setupFileSystems(conf);
-
- conf.set(MRConfig.FRAMEWORK_NAME, GridHadoopClientProtocol.FRAMEWORK_NAME);
- conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port);
-
- conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/");
-
- return conf;
- }
-
- /**
- * @return Protocol provider.
- */
- private GridHadoopClientProtocolProvider provider() {
- return new GridHadoopClientProtocolProvider();
- }
-
- /**
- * Test mapper.
- */
- public static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
- /** Writable container for writing word. */
- private Text word = new Text();
-
- /** Writable integer constant of '1' is writing as count of found words. */
- private static final IntWritable one = new IntWritable(1);
-
- /** {@inheritDoc} */
- @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
- while (mapLockFile.exists())
- Thread.sleep(50);
-
- StringTokenizer wordList = new StringTokenizer(val.toString());
-
- while (wordList.hasMoreTokens()) {
- word.set(wordList.nextToken());
-
- ctx.write(word, one);
- }
- }
- }
-
- /**
- * Test Hadoop counters.
- */
- public enum TestCounter {
- COUNTER1, COUNTER2, COUNTER3
- }
-
- /**
- * Test mapper that uses counters.
- */
- public static class TestCountingMapper extends TestMapper {
- /** {@inheritDoc} */
- @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
- super.map(key, val, ctx);
- ctx.getCounter(TestCounter.COUNTER1).increment(1);
- }
- }
-
- /**
- * Test combiner that counts invocations.
- */
- public static class TestCountingCombiner extends TestReducer {
- @Override public void reduce(Text key, Iterable<IntWritable> values,
- Context ctx) throws IOException, InterruptedException {
- ctx.getCounter(TestCounter.COUNTER1).increment(1);
- ctx.getCounter(TestCounter.COUNTER2).increment(1);
-
- int sum = 0;
- for (IntWritable value : values) {
- sum += value.get();
- }
-
- ctx.write(key, new IntWritable(sum));
- }
- }
-
- /**
- * Test reducer that counts invocations.
- */
- public static class TestCountingReducer extends TestReducer {
- @Override public void reduce(Text key, Iterable<IntWritable> values,
- Context ctx) throws IOException, InterruptedException {
- ctx.getCounter(TestCounter.COUNTER1).increment(1);
- ctx.getCounter(TestCounter.COUNTER3).increment(1);
- }
- }
-
- /**
- * Test combiner.
- */
- public static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
- // No-op.
- }
-
- public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> {
- /** {@inheritDoc} */
- @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
- throws IOException {
- return new TestOutputCommitter(ctx, (FileOutputCommitter)super.getOutputCommitter(ctx));
- }
- }
-
- /**
- * Test output committer.
- */
- private static class TestOutputCommitter extends FileOutputCommitter {
- /** Delegate. */
- private final FileOutputCommitter delegate;
-
- /**
- * Constructor.
- *
- * @param ctx Task attempt context.
- * @param delegate Delegate.
- * @throws IOException If failed.
- */
- private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate) throws IOException {
- super(FileOutputFormat.getOutputPath(ctx), ctx);
-
- this.delegate = delegate;
- }
-
- /** {@inheritDoc} */
- @Override public void setupJob(JobContext jobCtx) throws IOException {
- try {
- while (setupLockFile.exists())
- Thread.sleep(50);
- }
- catch (InterruptedException ignored) {
- throw new IOException("Interrupted.");
- }
-
- delegate.setupJob(jobCtx);
- }
-
- /** {@inheritDoc} */
- @Override public void setupTask(TaskAttemptContext taskCtx) throws IOException {
- delegate.setupTask(taskCtx);
- }
-
- /** {@inheritDoc} */
- @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) throws IOException {
- return delegate.needsTaskCommit(taskCtx);
- }
-
- /** {@inheritDoc} */
- @Override public void commitTask(TaskAttemptContext taskCtx) throws IOException {
- delegate.commitTask(taskCtx);
- }
-
- /** {@inheritDoc} */
- @Override public void abortTask(TaskAttemptContext taskCtx) throws IOException {
- delegate.abortTask(taskCtx);
- }
- }
-
- /**
- * Test reducer.
- */
- public static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
- /** Writable container for writing sum of word counts. */
- private IntWritable totalWordCnt = new IntWritable();
-
- /** {@inheritDoc} */
- @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
- InterruptedException {
- while (reduceLockFile.exists())
- Thread.sleep(50);
-
- int wordCnt = 0;
-
- for (IntWritable value : values)
- wordCnt += value.get();
-
- totalWordCnt.set(wordCnt);
-
- ctx.write(key, totalWordCnt);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
new file mode 100644
index 0000000..ffa20d1
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.hadoop;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ * Hadoop client protocol tests in embedded process mode.
+ */
+public class HadoopClientProtocolEmbeddedSelfTest extends HadoopClientProtocolSelfTest {
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+ HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+ cfg.setExternalExecution(false);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
new file mode 100644
index 0000000..d19a8ea
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.hadoop.mapreduce.protocol.*;
+import org.apache.ignite.*;
+import org.apache.ignite.hadoop.mapreduce.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.proto.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop client protocol tests in external process mode.
+ */
+@SuppressWarnings("ResultOfMethodCallIgnored")
+public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
+ /** Input path. */
+ private static final String PATH_INPUT = "/input";
+
+ /** Output path. */
+ private static final String PATH_OUTPUT = "/output";
+
+ /** Job name. */
+ private static final String JOB_NAME = "myJob";
+
+ /** Setup lock file. */
+ private static File setupLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
+ "ignite-lock-setup.file");
+
+ /** Map lock file. */
+ private static File mapLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
+ "ignite-lock-map.file");
+
+ /** Reduce lock file. */
+ private static File reduceLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
+ "ignite-lock-reduce.file");
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean igfsEnabled() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean restEnabled() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(gridCount());
+
+ setupLockFile.delete();
+ mapLockFile.delete();
+ reduceLockFile.delete();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+
+// IgniteHadoopClientProtocolProvider.cliMap.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ setupLockFile.createNewFile();
+ mapLockFile.createNewFile();
+ reduceLockFile.createNewFile();
+
+ setupLockFile.deleteOnExit();
+ mapLockFile.deleteOnExit();
+ reduceLockFile.deleteOnExit();
+
+ super.beforeTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ grid(0).fileSystem(HadoopAbstractSelfTest.igfsName).format();
+
+ setupLockFile.delete();
+ mapLockFile.delete();
+ reduceLockFile.delete();
+
+ super.afterTest();
+ }
+
+ /**
+ * Test next job ID generation.
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private void tstNextJobId() throws Exception {
+ IgniteHadoopClientProtocolProvider provider = provider();
+
+ ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT));
+
+ JobID jobId = proto.getNewJobID();
+
+ assert jobId != null;
+ assert jobId.getJtIdentifier() != null;
+
+ JobID nextJobId = proto.getNewJobID();
+
+ assert nextJobId != null;
+ assert nextJobId.getJtIdentifier() != null;
+
+ assert !F.eq(jobId, nextJobId);
+ }
+
+ /**
+ * Tests job counters retrieval.
+ *
+ * @throws Exception If failed.
+ */
+ public void testJobCounters() throws Exception {
+ IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName);
+
+ igfs.mkdirs(new IgfsPath(PATH_INPUT));
+
+ try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
+ new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
+
+ bw.write(
+ "alpha\n" +
+ "beta\n" +
+ "gamma\n" +
+ "alpha\n" +
+ "beta\n" +
+ "gamma\n" +
+ "alpha\n" +
+ "beta\n" +
+ "gamma\n"
+ );
+ }
+
+ Configuration conf = config(HadoopAbstractSelfTest.REST_PORT);
+
+ final Job job = Job.getInstance(conf);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setMapperClass(TestCountingMapper.class);
+ job.setReducerClass(TestCountingReducer.class);
+ job.setCombinerClass(TestCountingCombiner.class);
+
+ FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+ FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+
+ job.submit();
+
+ final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
+
+ assertEquals(0, cntr.getValue());
+
+ cntr.increment(10);
+
+ assertEquals(10, cntr.getValue());
+
+ // Transferring to map phase.
+ setupLockFile.delete();
+
+ // Transferring to reduce phase.
+ mapLockFile.delete();
+
+ job.waitForCompletion(false);
+
+ assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
+
+ final Counters counters = job.getCounters();
+
+ assertNotNull("counters cannot be null", counters);
+ assertEquals("wrong counters count", 3, counters.countCounters());
+ assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
+ assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
+ assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
+ }
+
+ /**
+ * Tests job counters retrieval for unknown job id.
+ *
+ * @throws Exception If failed.
+ */
+ private void tstUnknownJobCounters() throws Exception {
+ IgniteHadoopClientProtocolProvider provider = provider();
+
+ ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT));
+
+ try {
+ proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1));
+ fail("exception must be thrown");
+ }
+ catch (Exception e) {
+ assert e instanceof IOException : "wrong error has been thrown";
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void tstJobSubmitMap() throws Exception {
+ checkJobSubmit(true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void tstJobSubmitMapCombine() throws Exception {
+ checkJobSubmit(false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void tstJobSubmitMapReduce() throws Exception {
+ checkJobSubmit(true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void tstJobSubmitMapCombineReduce() throws Exception {
+ checkJobSubmit(false, false);
+ }
+
+ /**
+ * Test job submission.
+ *
+ * @param noCombiners Whether there are no combiners.
+ * @param noReducers Whether there are no reducers.
+ * @throws Exception If failed.
+ */
+ public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception {
+ IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName);
+
+ igfs.mkdirs(new IgfsPath(PATH_INPUT));
+
+ try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
+ new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
+
+ bw.write("word");
+ }
+
+ Configuration conf = config(HadoopAbstractSelfTest.REST_PORT);
+
+ final Job job = Job.getInstance(conf);
+
+ job.setJobName(JOB_NAME);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setMapperClass(TestMapper.class);
+ job.setReducerClass(TestReducer.class);
+
+ if (!noCombiners)
+ job.setCombinerClass(TestCombiner.class);
+
+ if (noReducers)
+ job.setNumReduceTasks(0);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(TestOutputFormat.class);
+
+ FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+ FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+
+ job.submit();
+
+ JobID jobId = job.getJobID();
+
+ // Setup phase.
+ JobStatus jobStatus = job.getStatus();
+ checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+ assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f;
+ assert jobStatus.getMapProgress() == 0.0f;
+ assert jobStatus.getReduceProgress() == 0.0f;
+
+ U.sleep(2100);
+
+ JobStatus recentJobStatus = job.getStatus();
+
+ assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() :
+ "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress();
+
+ // Transferring to map phase.
+ setupLockFile.delete();
+
+ assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ return F.eq(1.0f, job.getStatus().getSetupProgress());
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Unexpected exception.", e);
+ }
+ }
+ }, 5000L);
+
+ // Map phase.
+ jobStatus = job.getStatus();
+ checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+ assert jobStatus.getSetupProgress() == 1.0f;
+ assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f;
+ assert jobStatus.getReduceProgress() == 0.0f;
+
+ U.sleep(2100);
+
+ recentJobStatus = job.getStatus();
+
+ assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() :
+ "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress();
+
+ // Transferring to reduce phase.
+ mapLockFile.delete();
+
+ assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ return F.eq(1.0f, job.getStatus().getMapProgress());
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Unexpected exception.", e);
+ }
+ }
+ }, 5000L);
+
+ if (!noReducers) {
+ // Reduce phase.
+ jobStatus = job.getStatus();
+ checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+ assert jobStatus.getSetupProgress() == 1.0f;
+ assert jobStatus.getMapProgress() == 1.0f;
+ assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f;
+
+ // Ensure that reduces progress increases.
+ U.sleep(2100);
+
+ recentJobStatus = job.getStatus();
+
+ assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() :
+ "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress();
+
+ reduceLockFile.delete();
+ }
+
+ job.waitForCompletion(false);
+
+ jobStatus = job.getStatus();
+ checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f);
+ assert jobStatus.getSetupProgress() == 1.0f;
+ assert jobStatus.getMapProgress() == 1.0f;
+ assert jobStatus.getReduceProgress() == 1.0f;
+
+ dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
+ }
+
+ /**
+ * Dump IGFS content.
+ *
+ * @param igfs IGFS.
+ * @param path Path.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private static void dumpIgfs(IgniteFileSystem igfs, IgfsPath path) throws Exception {
+ IgfsFile file = igfs.info(path);
+
+ assert file != null;
+
+ System.out.println(file.path());
+
+ if (file.isDirectory()) {
+ for (IgfsPath child : igfs.listPaths(path))
+ dumpIgfs(igfs, child);
+ }
+ else {
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(igfs.open(path)))) {
+ String line = br.readLine();
+
+ while (line != null) {
+ System.out.println(line);
+
+ line = br.readLine();
+ }
+ }
+ }
+ }
+
+ /**
+ * Check job status.
+ *
+ * @param status Job status.
+ * @param expJobId Expected job ID.
+ * @param expJobName Expected job name.
+ * @param expState Expected state.
+ * @param expCleanupProgress Expected cleanup progress.
+ * @throws Exception If failed.
+ */
+ private static void checkJobStatus(JobStatus status, JobID expJobId, String expJobName,
+ JobStatus.State expState, float expCleanupProgress) throws Exception {
+ assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", actual=" + status.getJobID();
+ assert F.eq(status.getJobName(), expJobName) : "Expected=" + expJobName + ", actual=" + status.getJobName();
+ assert F.eq(status.getState(), expState) : "Expected=" + expState + ", actual=" + status.getState();
+ assert F.eq(status.getCleanupProgress(), expCleanupProgress) :
+ "Expected=" + expCleanupProgress + ", actual=" + status.getCleanupProgress();
+ }
+
+ /**
+ * @return Configuration.
+ */
+ private Configuration config(int port) {
+ Configuration conf = new Configuration();
+
+ setupFileSystems(conf);
+
+ conf.set(MRConfig.FRAMEWORK_NAME, HadoopClientProtocol.FRAMEWORK_NAME);
+ conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port);
+
+ conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/");
+
+ return conf;
+ }
+
+ /**
+ * @return Protocol provider.
+ */
+ private IgniteHadoopClientProtocolProvider provider() {
+ return new IgniteHadoopClientProtocolProvider();
+ }
+
+ /**
+ * Test mapper.
+ */
+ public static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
+ /** Writable container for writing word. */
+ private Text word = new Text();
+
+ /** Writable integer constant of '1' is writing as count of found words. */
+ private static final IntWritable one = new IntWritable(1);
+
+ /** {@inheritDoc} */
+ @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+ while (mapLockFile.exists())
+ Thread.sleep(50);
+
+ StringTokenizer wordList = new StringTokenizer(val.toString());
+
+ while (wordList.hasMoreTokens()) {
+ word.set(wordList.nextToken());
+
+ ctx.write(word, one);
+ }
+ }
+ }
+
+ /**
+ * Test Hadoop counters.
+ */
+ public enum TestCounter {
+ COUNTER1, COUNTER2, COUNTER3
+ }
+
+ /**
+ * Test mapper that uses counters.
+ */
+ public static class TestCountingMapper extends TestMapper {
+ /** {@inheritDoc} */
+ @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+ super.map(key, val, ctx);
+ ctx.getCounter(TestCounter.COUNTER1).increment(1);
+ }
+ }
+
+ /**
+ * Test combiner that counts invocations.
+ */
+ public static class TestCountingCombiner extends TestReducer {
+ @Override public void reduce(Text key, Iterable<IntWritable> values,
+ Context ctx) throws IOException, InterruptedException {
+ ctx.getCounter(TestCounter.COUNTER1).increment(1);
+ ctx.getCounter(TestCounter.COUNTER2).increment(1);
+
+ int sum = 0;
+ for (IntWritable value : values) {
+ sum += value.get();
+ }
+
+ ctx.write(key, new IntWritable(sum));
+ }
+ }
+
+ /**
+ * Test reducer that counts invocations.
+ */
+ public static class TestCountingReducer extends TestReducer {
+ @Override public void reduce(Text key, Iterable<IntWritable> values,
+ Context ctx) throws IOException, InterruptedException {
+ ctx.getCounter(TestCounter.COUNTER1).increment(1);
+ ctx.getCounter(TestCounter.COUNTER3).increment(1);
+ }
+ }
+
+ /**
+ * Test combiner.
+ */
+ public static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
+ // No-op.
+ }
+
+ public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> {
+ /** {@inheritDoc} */
+ @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
+ throws IOException {
+ return new TestOutputCommitter(ctx, (FileOutputCommitter)super.getOutputCommitter(ctx));
+ }
+ }
+
+ /**
+ * Test output committer.
+ */
+ private static class TestOutputCommitter extends FileOutputCommitter {
+ /** Delegate. */
+ private final FileOutputCommitter delegate;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Task attempt context.
+ * @param delegate Delegate.
+ * @throws IOException If failed.
+ */
+ private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate) throws IOException {
+ super(FileOutputFormat.getOutputPath(ctx), ctx);
+
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setupJob(JobContext jobCtx) throws IOException {
+ try {
+ while (setupLockFile.exists())
+ Thread.sleep(50);
+ }
+ catch (InterruptedException ignored) {
+ throw new IOException("Interrupted.");
+ }
+
+ delegate.setupJob(jobCtx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setupTask(TaskAttemptContext taskCtx) throws IOException {
+ delegate.setupTask(taskCtx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) throws IOException {
+ return delegate.needsTaskCommit(taskCtx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commitTask(TaskAttemptContext taskCtx) throws IOException {
+ delegate.commitTask(taskCtx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void abortTask(TaskAttemptContext taskCtx) throws IOException {
+ delegate.abortTask(taskCtx);
+ }
+ }
+
+ /**
+ * Test reducer.
+ */
+ public static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+ /** Writable container for writing sum of word counts. */
+ private IntWritable totalWordCnt = new IntWritable();
+
+ /** {@inheritDoc} */
+ @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
+ InterruptedException {
+ while (reduceLockFile.exists())
+ Thread.sleep(50);
+
+ int wordCnt = 0;
+
+ for (IntWritable value : values)
+ wordCnt += value.get();
+
+ totalWordCnt.set(wordCnt);
+
+ ctx.write(key, totalWordCnt);
+ }
+ }
+}