You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 15:26:21 UTC
[31/68] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented
new class loading architecture for embedded execution mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
deleted file mode 100644
index 4b1121c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ /dev/null
@@ -1,560 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.DataInput;
-import java.io.File;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Comparator;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.apache.ignite.internal.processors.hadoop.HadoopTask;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1CleanupTask;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1MapTask;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1Partitioner;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1ReduceTask;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1SetupTask;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.unwrapSplit;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.FsCacheKey;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
-
-/**
- * Context for task execution.
- */
-public class HadoopV2TaskContext extends HadoopTaskContext {
- /** */
- private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
-
- /** Lazy per-user file system cache used by the Hadoop task. */
- private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
- = createHadoopLazyConcurrentMap();
-
- /**
- * This method is called with reflection upon Job finish with class loader of each task.
- * This will clean up all the Fs created for specific task.
- * Each class loader sees uses its own instance of <code>fsMap<code/> since the class loaders
- * are different.
- *
- * @throws IgniteCheckedException On error.
- */
- public static void close() throws IgniteCheckedException {
- fsMap.close();
- }
-
- /**
- * Check for combiner grouping support (available since Hadoop 2.3).
- */
- static {
- boolean ok;
-
- try {
- JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator");
-
- ok = true;
- }
- catch (NoSuchMethodException ignore) {
- ok = false;
- }
-
- COMBINE_KEY_GROUPING_SUPPORTED = ok;
- }
-
- /** Flag is set if new context-object code is used for running the mapper. */
- private final boolean useNewMapper;
-
- /** Flag is set if new context-object code is used for running the reducer. */
- private final boolean useNewReducer;
-
- /** Flag is set if new context-object code is used for running the combiner. */
- private final boolean useNewCombiner;
-
- /** */
- private final JobContextImpl jobCtx;
-
- /** Set if task is to cancelling. */
- private volatile boolean cancelled;
-
- /** Current task. */
- private volatile HadoopTask task;
-
- /** Local node ID */
- private final UUID locNodeId;
-
- /** Counters for task. */
- private final HadoopCounters cntrs = new HadoopCountersImpl();
-
- /**
- * @param taskInfo Task info.
- * @param job Job.
- * @param jobId Job ID.
- * @param locNodeId Local node ID.
- * @param jobConfDataInput DataInput for read JobConf.
- */
- public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId,
- @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
- super(taskInfo, job);
- this.locNodeId = locNodeId;
-
- // Before create JobConf instance we should set new context class loader.
- ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader());
-
- try {
- JobConf jobConf = new JobConf();
-
- try {
- jobConf.readFields(jobConfDataInput);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
-
- // For map-reduce jobs prefer local writes.
- jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
-
- jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId()));
-
- useNewMapper = jobConf.getUseNewMapper();
- useNewReducer = jobConf.getUseNewReducer();
- useNewCombiner = jobConf.getCombinerClass() == null;
- }
- finally {
- HadoopUtils.restoreContextClassLoader(oldLdr);
- }
- }
-
- /** {@inheritDoc} */
- @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
- return cntrs.counter(grp, name, cls);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopCounters counters() {
- return cntrs;
- }
-
- /**
- * Creates appropriate task from current task info.
- *
- * @return Task.
- */
- private HadoopTask createTask() {
- boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT;
-
- switch (taskInfo().type()) {
- case SETUP:
- return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new HadoopV1SetupTask(taskInfo());
-
- case MAP:
- return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new HadoopV1MapTask(taskInfo());
-
- case REDUCE:
- return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), true) :
- new HadoopV1ReduceTask(taskInfo(), true);
-
- case COMBINE:
- return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), false) :
- new HadoopV1ReduceTask(taskInfo(), false);
-
- case COMMIT:
- case ABORT:
- return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), isAbort) :
- new HadoopV1CleanupTask(taskInfo(), isAbort);
-
- default:
- return null;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void run() throws IgniteCheckedException {
- ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader());
-
- try {
- try {
- task = createTask();
- }
- catch (Throwable e) {
- if (e instanceof Error)
- throw e;
-
- throw transformException(e);
- }
-
- if (cancelled)
- throw new HadoopTaskCancelledException("Task cancelled.");
-
- try {
- task.run(this);
- }
- catch (Throwable e) {
- if (e instanceof Error)
- throw e;
-
- throw transformException(e);
- }
- }
- finally {
- task = null;
-
- HadoopUtils.restoreContextClassLoader(oldLdr);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- cancelled = true;
-
- HadoopTask t = task;
-
- if (t != null)
- t.cancel();
- }
-
- /** {@inheritDoc} */
- @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
- File locDir;
-
- switch(taskInfo().type()) {
- case MAP:
- case REDUCE:
- job().prepareTaskEnvironment(taskInfo());
-
- locDir = taskLocalDir(locNodeId, taskInfo());
-
- break;
-
- default:
- locDir = jobLocalDir(locNodeId, taskInfo().jobId());
- }
-
- ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader());
-
- try {
- FileSystem.get(jobConf());
-
- LocalFileSystem locFs = FileSystem.getLocal(jobConf());
-
- locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
- }
- catch (Throwable e) {
- if (e instanceof Error)
- throw (Error)e;
-
- throw transformException(e);
- }
- finally {
- HadoopUtils.restoreContextClassLoader(oldLdr);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
- job().cleanupTaskEnvironment(taskInfo());
- }
-
- /**
- * Creates Hadoop attempt ID.
- *
- * @return Attempt ID.
- */
- public TaskAttemptID attemptId() {
- TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber());
-
- return new TaskAttemptID(tid, taskInfo().attempt());
- }
-
- /**
- * @param type Task type.
- * @return Hadoop task type.
- */
- private TaskType taskType(HadoopTaskType type) {
- switch (type) {
- case SETUP:
- return TaskType.JOB_SETUP;
- case MAP:
- case COMBINE:
- return TaskType.MAP;
-
- case REDUCE:
- return TaskType.REDUCE;
-
- case COMMIT:
- case ABORT:
- return TaskType.JOB_CLEANUP;
-
- default:
- return null;
- }
- }
-
- /**
- * Gets job configuration of the task.
- *
- * @return Job configuration.
- */
- public JobConf jobConf() {
- return jobCtx.getJobConf();
- }
-
- /**
- * Gets job context of the task.
- *
- * @return Job context.
- */
- public JobContextImpl jobContext() {
- return jobCtx;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopPartitioner partitioner() throws IgniteCheckedException {
- Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null);
-
- if (partClsOld != null)
- return new HadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf());
-
- try {
- return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf());
- }
- catch (ClassNotFoundException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /**
- * Gets serializer for specified class.
- *
- * @param cls Class.
- * @param jobConf Job configuration.
- * @return Appropriate serializer.
- */
- @SuppressWarnings("unchecked")
- private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
- A.notNull(cls, "cls");
-
- SerializationFactory factory = new SerializationFactory(jobConf);
-
- Serialization<?> serialization = factory.getSerialization(cls);
-
- if (serialization == null)
- throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());
-
- if (serialization.getClass() == WritableSerialization.class)
- return new HadoopWritableSerialization((Class<? extends Writable>)cls);
-
- return new HadoopSerializationWrapper(serialization, cls);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopSerialization keySerialization() throws IgniteCheckedException {
- return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf());
- }
-
- /** {@inheritDoc} */
- @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException {
- return getSerialization(jobCtx.getMapOutputValueClass(), jobConf());
- }
-
- /** {@inheritDoc} */
- @Override public Comparator<Object> sortComparator() {
- return (Comparator<Object>)jobCtx.getSortComparator();
- }
-
- /** {@inheritDoc} */
- @Override public Comparator<Object> groupComparator() {
- Comparator<?> res;
-
- switch (taskInfo().type()) {
- case COMBINE:
- res = COMBINE_KEY_GROUPING_SUPPORTED ?
- jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator();
-
- break;
-
- case REDUCE:
- res = jobContext().getGroupingComparator();
-
- break;
-
- default:
- return null;
- }
-
- if (res != null && res.getClass() != sortComparator().getClass())
- return (Comparator<Object>)res;
-
- return null;
- }
-
- /**
- * @param split Split.
- * @return Native Hadoop split.
- * @throws IgniteCheckedException if failed.
- */
- @SuppressWarnings("unchecked")
- public Object getNativeSplit(HadoopInputSplit split) throws IgniteCheckedException {
- if (split instanceof HadoopExternalSplit)
- return readExternalSplit((HadoopExternalSplit)split);
-
- if (split instanceof HadoopSplitWrapper)
- return unwrapSplit((HadoopSplitWrapper)split);
-
- throw new IllegalStateException("Unknown split: " + split);
- }
-
- /**
- * @param split External split.
- * @return Native input split.
- * @throws IgniteCheckedException If failed.
- */
- @SuppressWarnings("unchecked")
- private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
- Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
-
- FileSystem fs;
-
- try {
- // This assertion uses .startsWith() instead of .equals() because task class loaders may
- // be reused between tasks of the same job.
- assert ((HadoopClassLoader)getClass().getClassLoader()).name()
- .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true));
-
- // We also cache Fs there, all them will be cleared explicitly upon the Job end.
- fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
-
- try (
- FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
-
- in.seek(split.offset());
-
- String clsName = Text.readString(in);
-
- Class<?> cls = jobConf().getClassByName(clsName);
-
- assert cls != null;
-
- Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls);
-
- Deserializer deserializer = serialization.getDeserializer(cls);
-
- deserializer.open(in);
-
- Object res = deserializer.deserialize(null);
-
- deserializer.close();
-
- assert res != null;
-
- return res;
- }
- catch (IOException | ClassNotFoundException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
- String user = job.info().user();
-
- user = IgfsUtils.fixUserName(user);
-
- assert user != null;
-
- String ugiUser;
-
- try {
- UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
-
- assert currUser != null;
-
- ugiUser = currUser.getShortUserName();
- }
- catch (IOException ioe) {
- throw new IgniteCheckedException(ioe);
- }
-
- try {
- if (F.eq(user, ugiUser))
- // if current UGI context user is the same, do direct call:
- return c.call();
- else {
- UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
-
- return ugi.doAs(new PrivilegedExceptionAction<T>() {
- @Override public T run() throws Exception {
- return c.call();
- }
- });
- }
- }
- catch (Exception e) {
- throw new IgniteCheckedException(e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
deleted file mode 100644
index f46f068..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Optimized serialization for Hadoop {@link Writable} types.
- */
-public class HadoopWritableSerialization implements HadoopSerialization {
- /** */
- private final Class<? extends Writable> cls;
-
- /**
- * @param cls Class.
- */
- public HadoopWritableSerialization(Class<? extends Writable> cls) {
- assert cls != null;
-
- this.cls = cls;
- }
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
- assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass();
-
- try {
- ((Writable)obj).write(out);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
- Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj);
-
- try {
- w.readFields(in);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
-
- return w;
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- // No-op.
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
deleted file mode 100644
index 5a20a75..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.hadoop;
-
-import org.apache.ignite.configuration.HadoopConfiguration;
-
-/**
- * Hadoop client protocol tests in embedded process mode.
- */
-public class HadoopClientProtocolEmbeddedSelfTest extends HadoopClientProtocolSelfTest {
- /** {@inheritDoc} */
- @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
- HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
- // TODO: IGNITE-404: Uncomment when fixed.
- //cfg.setExternalExecution(false);
-
- return cfg;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
deleted file mode 100644
index 1344e26..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
+++ /dev/null
@@ -1,654 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.hadoop;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.util.StringTokenizer;
-import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
-
-/**
- * Hadoop client protocol tests in external process mode.
- */
-@SuppressWarnings("ResultOfMethodCallIgnored")
-public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
- /** Input path. */
- private static final String PATH_INPUT = "/input";
-
- /** Output path. */
- private static final String PATH_OUTPUT = "/output";
-
- /** Job name. */
- private static final String JOB_NAME = "myJob";
-
- /** Setup lock file. */
- private static File setupLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
- "ignite-lock-setup.file");
-
- /** Map lock file. */
- private static File mapLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
- "ignite-lock-map.file");
-
- /** Reduce lock file. */
- private static File reduceLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
- "ignite-lock-reduce.file");
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 2;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean igfsEnabled() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean restEnabled() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- startGrids(gridCount());
-
- setupLockFile.delete();
- mapLockFile.delete();
- reduceLockFile.delete();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
-
- super.afterTestsStopped();
-
-// IgniteHadoopClientProtocolProvider.cliMap.clear();
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- setupLockFile.createNewFile();
- mapLockFile.createNewFile();
- reduceLockFile.createNewFile();
-
- setupLockFile.deleteOnExit();
- mapLockFile.deleteOnExit();
- reduceLockFile.deleteOnExit();
-
- super.beforeTest();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- grid(0).fileSystem(HadoopAbstractSelfTest.igfsName).format();
-
- setupLockFile.delete();
- mapLockFile.delete();
- reduceLockFile.delete();
-
- super.afterTest();
- }
-
- /**
- * Test next job ID generation.
- *
- * @throws Exception If failed.
- */
- @SuppressWarnings("ConstantConditions")
- private void tstNextJobId() throws Exception {
- IgniteHadoopClientProtocolProvider provider = provider();
-
- ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT));
-
- JobID jobId = proto.getNewJobID();
-
- assert jobId != null;
- assert jobId.getJtIdentifier() != null;
-
- JobID nextJobId = proto.getNewJobID();
-
- assert nextJobId != null;
- assert nextJobId.getJtIdentifier() != null;
-
- assert !F.eq(jobId, nextJobId);
- }
-
- /**
- * Tests job counters retrieval.
- *
- * @throws Exception If failed.
- */
- public void testJobCounters() throws Exception {
- IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName);
-
- igfs.mkdirs(new IgfsPath(PATH_INPUT));
-
- try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
- new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
-
- bw.write(
- "alpha\n" +
- "beta\n" +
- "gamma\n" +
- "alpha\n" +
- "beta\n" +
- "gamma\n" +
- "alpha\n" +
- "beta\n" +
- "gamma\n"
- );
- }
-
- Configuration conf = config(HadoopAbstractSelfTest.REST_PORT);
-
- final Job job = Job.getInstance(conf);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setMapperClass(TestCountingMapper.class);
- job.setReducerClass(TestCountingReducer.class);
- job.setCombinerClass(TestCountingCombiner.class);
-
- FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
- FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
-
- job.submit();
-
- final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
-
- assertEquals(0, cntr.getValue());
-
- cntr.increment(10);
-
- assertEquals(10, cntr.getValue());
-
- // Transferring to map phase.
- setupLockFile.delete();
-
- // Transferring to reduce phase.
- mapLockFile.delete();
-
- job.waitForCompletion(false);
-
- assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
-
- final Counters counters = job.getCounters();
-
- assertNotNull("counters cannot be null", counters);
- assertEquals("wrong counters count", 3, counters.countCounters());
- assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
- assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
- assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
- }
-
- /**
- * Tests job counters retrieval for unknown job id.
- *
- * @throws Exception If failed.
- */
- private void tstUnknownJobCounters() throws Exception {
- IgniteHadoopClientProtocolProvider provider = provider();
-
- ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT));
-
- try {
- proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1));
- fail("exception must be thrown");
- }
- catch (Exception e) {
- assert e instanceof IOException : "wrong error has been thrown";
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- private void tstJobSubmitMap() throws Exception {
- checkJobSubmit(true, true);
- }
-
- /**
- * @throws Exception If failed.
- */
- private void tstJobSubmitMapCombine() throws Exception {
- checkJobSubmit(false, true);
- }
-
- /**
- * @throws Exception If failed.
- */
- private void tstJobSubmitMapReduce() throws Exception {
- checkJobSubmit(true, false);
- }
-
- /**
- * @throws Exception If failed.
- */
- private void tstJobSubmitMapCombineReduce() throws Exception {
- checkJobSubmit(false, false);
- }
-
- /**
- * Test job submission.
- *
- * @param noCombiners Whether there are no combiners.
- * @param noReducers Whether there are no reducers.
- * @throws Exception If failed.
- */
- public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception {
- IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName);
-
- igfs.mkdirs(new IgfsPath(PATH_INPUT));
-
- try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
- new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
-
- bw.write("word");
- }
-
- Configuration conf = config(HadoopAbstractSelfTest.REST_PORT);
-
- final Job job = Job.getInstance(conf);
-
- job.setJobName(JOB_NAME);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setMapperClass(TestMapper.class);
- job.setReducerClass(TestReducer.class);
-
- if (!noCombiners)
- job.setCombinerClass(TestCombiner.class);
-
- if (noReducers)
- job.setNumReduceTasks(0);
-
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TestOutputFormat.class);
-
- FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
- FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
-
- job.submit();
-
- JobID jobId = job.getJobID();
-
- // Setup phase.
- JobStatus jobStatus = job.getStatus();
- checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
- assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f;
- assert jobStatus.getMapProgress() == 0.0f;
- assert jobStatus.getReduceProgress() == 0.0f;
-
- U.sleep(2100);
-
- JobStatus recentJobStatus = job.getStatus();
-
- assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() :
- "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress();
-
- // Transferring to map phase.
- setupLockFile.delete();
-
- assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- try {
- return F.eq(1.0f, job.getStatus().getSetupProgress());
- }
- catch (Exception e) {
- throw new RuntimeException("Unexpected exception.", e);
- }
- }
- }, 5000L);
-
- // Map phase.
- jobStatus = job.getStatus();
- checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
- assert jobStatus.getSetupProgress() == 1.0f;
- assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f;
- assert jobStatus.getReduceProgress() == 0.0f;
-
- U.sleep(2100);
-
- recentJobStatus = job.getStatus();
-
- assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() :
- "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress();
-
- // Transferring to reduce phase.
- mapLockFile.delete();
-
- assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- try {
- return F.eq(1.0f, job.getStatus().getMapProgress());
- }
- catch (Exception e) {
- throw new RuntimeException("Unexpected exception.", e);
- }
- }
- }, 5000L);
-
- if (!noReducers) {
- // Reduce phase.
- jobStatus = job.getStatus();
- checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
- assert jobStatus.getSetupProgress() == 1.0f;
- assert jobStatus.getMapProgress() == 1.0f;
- assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f;
-
- // Ensure that reduces progress increases.
- U.sleep(2100);
-
- recentJobStatus = job.getStatus();
-
- assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() :
- "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress();
-
- reduceLockFile.delete();
- }
-
- job.waitForCompletion(false);
-
- jobStatus = job.getStatus();
- checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f);
- assert jobStatus.getSetupProgress() == 1.0f;
- assert jobStatus.getMapProgress() == 1.0f;
- assert jobStatus.getReduceProgress() == 1.0f;
-
- dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
- }
-
- /**
- * Dump IGFS content.
- *
- * @param igfs IGFS.
- * @param path Path.
- * @throws Exception If failed.
- */
- @SuppressWarnings("ConstantConditions")
- private static void dumpIgfs(IgniteFileSystem igfs, IgfsPath path) throws Exception {
- IgfsFile file = igfs.info(path);
-
- assert file != null;
-
- System.out.println(file.path());
-
- if (file.isDirectory()) {
- for (IgfsPath child : igfs.listPaths(path))
- dumpIgfs(igfs, child);
- }
- else {
- try (BufferedReader br = new BufferedReader(new InputStreamReader(igfs.open(path)))) {
- String line = br.readLine();
-
- while (line != null) {
- System.out.println(line);
-
- line = br.readLine();
- }
- }
- }
- }
-
- /**
- * Check job status.
- *
- * @param status Job status.
- * @param expJobId Expected job ID.
- * @param expJobName Expected job name.
- * @param expState Expected state.
- * @param expCleanupProgress Expected cleanup progress.
- * @throws Exception If failed.
- */
- private static void checkJobStatus(JobStatus status, JobID expJobId, String expJobName,
- JobStatus.State expState, float expCleanupProgress) throws Exception {
- assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", actual=" + status.getJobID();
- assert F.eq(status.getJobName(), expJobName) : "Expected=" + expJobName + ", actual=" + status.getJobName();
- assert F.eq(status.getState(), expState) : "Expected=" + expState + ", actual=" + status.getState();
- assert F.eq(status.getCleanupProgress(), expCleanupProgress) :
- "Expected=" + expCleanupProgress + ", actual=" + status.getCleanupProgress();
- }
-
- /**
- * @return Configuration.
- */
- private Configuration config(int port) {
- Configuration conf = HadoopUtils.safeCreateConfiguration();
-
- setupFileSystems(conf);
-
- conf.set(MRConfig.FRAMEWORK_NAME, IgniteHadoopClientProtocolProvider.FRAMEWORK_NAME);
- conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port);
-
- conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/");
-
- return conf;
- }
-
- /**
- * @return Protocol provider.
- */
- private IgniteHadoopClientProtocolProvider provider() {
- return new IgniteHadoopClientProtocolProvider();
- }
-
- /**
- * Test mapper.
- */
- public static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
- /** Writable container for writing word. */
- private Text word = new Text();
-
- /** Writable integer constant of '1' is writing as count of found words. */
- private static final IntWritable one = new IntWritable(1);
-
- /** {@inheritDoc} */
- @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
- while (mapLockFile.exists())
- Thread.sleep(50);
-
- StringTokenizer wordList = new StringTokenizer(val.toString());
-
- while (wordList.hasMoreTokens()) {
- word.set(wordList.nextToken());
-
- ctx.write(word, one);
- }
- }
- }
-
- /**
- * Test Hadoop counters.
- */
- public enum TestCounter {
- COUNTER1, COUNTER2, COUNTER3
- }
-
- /**
- * Test mapper that uses counters.
- */
- public static class TestCountingMapper extends TestMapper {
- /** {@inheritDoc} */
- @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
- super.map(key, val, ctx);
- ctx.getCounter(TestCounter.COUNTER1).increment(1);
- }
- }
-
- /**
- * Test combiner that counts invocations.
- */
- public static class TestCountingCombiner extends TestReducer {
- @Override public void reduce(Text key, Iterable<IntWritable> values,
- Context ctx) throws IOException, InterruptedException {
- ctx.getCounter(TestCounter.COUNTER1).increment(1);
- ctx.getCounter(TestCounter.COUNTER2).increment(1);
-
- int sum = 0;
- for (IntWritable value : values)
- sum += value.get();
-
- ctx.write(key, new IntWritable(sum));
- }
- }
-
- /**
- * Test reducer that counts invocations.
- */
- public static class TestCountingReducer extends TestReducer {
- @Override public void reduce(Text key, Iterable<IntWritable> values,
- Context ctx) throws IOException, InterruptedException {
- ctx.getCounter(TestCounter.COUNTER1).increment(1);
- ctx.getCounter(TestCounter.COUNTER3).increment(1);
- }
- }
-
- /**
- * Test combiner.
- */
- public static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
- // No-op.
- }
-
- public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> {
- /** {@inheritDoc} */
- @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
- throws IOException {
- return new TestOutputCommitter(ctx, (FileOutputCommitter)super.getOutputCommitter(ctx));
- }
- }
-
- /**
- * Test output committer.
- */
- private static class TestOutputCommitter extends FileOutputCommitter {
- /** Delegate. */
- private final FileOutputCommitter delegate;
-
- /**
- * Constructor.
- *
- * @param ctx Task attempt context.
- * @param delegate Delegate.
- * @throws IOException If failed.
- */
- private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate) throws IOException {
- super(FileOutputFormat.getOutputPath(ctx), ctx);
-
- this.delegate = delegate;
- }
-
- /** {@inheritDoc} */
- @Override public void setupJob(JobContext jobCtx) throws IOException {
- try {
- while (setupLockFile.exists())
- Thread.sleep(50);
- }
- catch (InterruptedException ignored) {
- throw new IOException("Interrupted.");
- }
-
- delegate.setupJob(jobCtx);
- }
-
- /** {@inheritDoc} */
- @Override public void setupTask(TaskAttemptContext taskCtx) throws IOException {
- delegate.setupTask(taskCtx);
- }
-
- /** {@inheritDoc} */
- @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) throws IOException {
- return delegate.needsTaskCommit(taskCtx);
- }
-
- /** {@inheritDoc} */
- @Override public void commitTask(TaskAttemptContext taskCtx) throws IOException {
- delegate.commitTask(taskCtx);
- }
-
- /** {@inheritDoc} */
- @Override public void abortTask(TaskAttemptContext taskCtx) throws IOException {
- delegate.abortTask(taskCtx);
- }
- }
-
- /**
- * Test reducer.
- */
- public static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
- /** Writable container for writing sum of word counts. */
- private IntWritable totalWordCnt = new IntWritable();
-
- /** {@inheritDoc} */
- @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
- InterruptedException {
- while (reduceLockFile.exists())
- Thread.sleep(50);
-
- int wordCnt = 0;
-
- for (IntWritable value : values)
- wordCnt += value.get();
-
- totalWordCnt.set(wordCnt);
-
- ctx.write(key, totalWordCnt);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java
deleted file mode 100644
index 6f910f1..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.cache;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.IgniteTxConfigCacheSelfTest;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-
-/**
- * Test checks whether hadoop system cache doesn't use user defined TX config.
- */
-public class HadoopTxConfigCacheTest extends IgniteTxConfigCacheSelfTest {
- /**
- * Success if system caches weren't timed out.
- *
- * @throws Exception
- */
- public void testSystemCacheTx() throws Exception {
- final Ignite ignite = grid(0);
-
- final IgniteInternalCache<Object, Object> hadoopCache = getSystemCache(ignite, CU.SYS_CACHE_HADOOP_MR);
-
- checkImplicitTxSuccess(hadoopCache);
- checkStartTxSuccess(hadoopCache);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
deleted file mode 100644
index ea7fa99..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.fs;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.concurrent.Callable;
-
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Assert;
-
-/**
- * Tests KerberosHadoopFileSystemFactory.
- */
-public class KerberosHadoopFileSystemFactorySelfTest extends GridCommonAbstractTest {
- /**
- * Test parameters validation.
- *
- * @throws Exception If failed.
- */
- public void testParameters() throws Exception {
- checkParameters(null, null, -1);
-
- checkParameters(null, null, 100);
- checkParameters(null, "b", -1);
- checkParameters("a", null, -1);
-
- checkParameters(null, "b", 100);
- checkParameters("a", null, 100);
- checkParameters("a", "b", -1);
- }
-
- /**
- * Check parameters.
- *
- * @param keyTab Key tab.
- * @param keyTabPrincipal Key tab principal.
- * @param reloginInterval Re-login interval.
- */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- private void checkParameters(String keyTab, String keyTabPrincipal, long reloginInterval) {
- final KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
-
- fac.setKeyTab(keyTab);
- fac.setKeyTabPrincipal(keyTabPrincipal);
- fac.setReloginInterval(reloginInterval);
-
- GridTestUtils.assertThrows(null, new Callable<Object>() {
- @Override public Object call() throws Exception {
- fac.start();
-
- return null;
- }
- }, IllegalArgumentException.class, null);
- }
-
- /**
- * Checks serializatuion and deserialization of the secure factory.
- *
- * @throws Exception If failed.
- */
- public void testSerialization() throws Exception {
- KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
-
- checkSerialization(fac);
-
- fac = new KerberosHadoopFileSystemFactory();
-
- fac.setUri("igfs://igfs@localhost:10500/");
- fac.setConfigPaths("/a/core-sute.xml", "/b/mapred-site.xml");
- fac.setKeyTabPrincipal("foo");
- fac.setKeyTab("/etc/krb5.keytab");
- fac.setReloginInterval(30 * 60 * 1000L);
-
- checkSerialization(fac);
- }
-
- /**
- * Serializes the factory,
- *
- * @param fac The facory to check.
- * @throws Exception If failed.
- */
- private void checkSerialization(KerberosHadoopFileSystemFactory fac) throws Exception {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- ObjectOutput oo = new ObjectOutputStream(baos);
-
- oo.writeObject(fac);
-
- ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
-
- KerberosHadoopFileSystemFactory fac2 = (KerberosHadoopFileSystemFactory)in.readObject();
-
- assertEquals(fac.getUri(), fac2.getUri());
- Assert.assertArrayEquals(fac.getConfigPaths(), fac2.getConfigPaths());
- assertEquals(fac.getKeyTab(), fac2.getKeyTab());
- assertEquals(fac.getKeyTabPrincipal(), fac2.getKeyTabPrincipal());
- assertEquals(fac.getReloginInterval(), fac2.getReloginInterval());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java
deleted file mode 100644
index fd8fdef..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.util;
-
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Test for basic user name mapper.
- */
-public class BasicUserNameMapperSelfTest extends GridCommonAbstractTest {
- /**
- * Test null mappings.
- *
- * @throws Exception If failed.
- */
- public void testNullMappings() throws Exception {
- checkNullOrEmptyMappings(null);
- }
-
- /**
- * Test empty mappings.
- *
- * @throws Exception If failed.
- */
- public void testEmptyMappings() throws Exception {
- checkNullOrEmptyMappings(new HashMap<String, String>());
- }
-
- /**
- * Check null or empty mappings.
- *
- * @param map Mappings.
- * @throws Exception If failed.
- */
- private void checkNullOrEmptyMappings(@Nullable Map<String, String> map) throws Exception {
- BasicUserNameMapper mapper = create(map, false, null);
-
- assertNull(mapper.map(null));
- assertEquals("1", mapper.map("1"));
- assertEquals("2", mapper.map("2"));
-
- mapper = create(map, true, null);
-
- assertNull(mapper.map(null));
- assertNull(mapper.map("1"));
- assertNull(mapper.map("2"));
-
- mapper = create(map, false, "A");
-
- assertNull(mapper.map(null));
- assertEquals("1", mapper.map("1"));
- assertEquals("2", mapper.map("2"));
-
- mapper = create(map, true, "A");
-
- assertEquals("A", mapper.map(null));
- assertEquals("A", mapper.map("1"));
- assertEquals("A", mapper.map("2"));
- }
-
- /**
- * Test regular mappings.
- *
- * @throws Exception If failed.
- */
- public void testMappings() throws Exception {
- Map<String, String> map = new HashMap<>();
-
- map.put("1", "101");
-
- BasicUserNameMapper mapper = create(map, false, null);
-
- assertNull(mapper.map(null));
- assertEquals("101", mapper.map("1"));
- assertEquals("2", mapper.map("2"));
-
- mapper = create(map, true, null);
-
- assertNull(mapper.map(null));
- assertEquals("101", mapper.map("1"));
- assertNull(mapper.map("2"));
-
- mapper = create(map, false, "A");
-
- assertNull(mapper.map(null));
- assertEquals("101", mapper.map("1"));
- assertEquals("2", mapper.map("2"));
-
- mapper = create(map, true, "A");
-
- assertEquals("A", mapper.map(null));
- assertEquals("101", mapper.map("1"));
- assertEquals("A", mapper.map("2"));
- }
-
- /**
- * Create mapper.
- *
- * @param dictionary Dictionary.
- * @param useDfltUsrName Whether to use default user name.
- * @param dfltUsrName Default user name.
- * @return Mapper.
- */
- private BasicUserNameMapper create(@Nullable Map<String, String> dictionary, boolean useDfltUsrName,
- @Nullable String dfltUsrName) {
- BasicUserNameMapper mapper = new BasicUserNameMapper();
-
- mapper.setMappings(dictionary);
- mapper.setUseDefaultUserName(useDfltUsrName);
- mapper.setDefaultUserName(dfltUsrName);
-
- return mapper;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/ChainedUserNameMapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/ChainedUserNameMapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/ChainedUserNameMapperSelfTest.java
deleted file mode 100644
index bfac49c..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/ChainedUserNameMapperSelfTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.util;
-
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import java.util.Collections;
-import java.util.concurrent.Callable;
-
-/**
- * Tests for chained user name mapper.
- */
-public class ChainedUserNameMapperSelfTest extends GridCommonAbstractTest {
- /** Test instance. */
- private static final String INSTANCE = "test_instance";
-
- /** Test realm. */
- private static final String REALM = "test_realm";
-
- /**
- * Test case when mappers are null.
- *
- * @throws Exception If failed.
- */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public void testNullMappers() throws Exception {
- GridTestUtils.assertThrows(null, new Callable<Void>() {
- @Override public Void call() throws Exception {
- create((UserNameMapper[])null);
-
- return null;
- }
- }, IgniteException.class, null);
- }
-
- /**
- * Test case when one of mappers is null.
- *
- * @throws Exception If failed.
- */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public void testNullMapperElement() throws Exception {
- GridTestUtils.assertThrows(null, new Callable<Void>() {
- @Override public Void call() throws Exception {
- create(new BasicUserNameMapper(), null);
-
- return null;
- }
- }, IgniteException.class, null);
- }
-
- /**
- * Test actual chaining logic.
- *
- * @throws Exception If failed.
- */
- public void testChaining() throws Exception {
- BasicUserNameMapper mapper1 = new BasicUserNameMapper();
-
- mapper1.setMappings(Collections.singletonMap("1", "101"));
-
- KerberosUserNameMapper mapper2 = new KerberosUserNameMapper();
-
- mapper2.setInstance(INSTANCE);
- mapper2.setRealm(REALM);
-
- ChainedUserNameMapper mapper = create(mapper1, mapper2);
-
- assertEquals("101" + "/" + INSTANCE + "@" + REALM, mapper.map("1"));
- assertEquals("2" + "/" + INSTANCE + "@" + REALM, mapper.map("2"));
- assertEquals(IgfsUtils.fixUserName(null) + "/" + INSTANCE + "@" + REALM, mapper.map(null));
- }
-
- /**
- * Create chained mapper.
- *
- * @param mappers Child mappers.
- * @return Chained mapper.
- */
- private ChainedUserNameMapper create(UserNameMapper... mappers) {
- ChainedUserNameMapper mapper = new ChainedUserNameMapper();
-
- mapper.setMappers(mappers);
-
- mapper.start();
-
- return mapper;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/KerberosUserNameMapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/KerberosUserNameMapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/KerberosUserNameMapperSelfTest.java
deleted file mode 100644
index cc685bb..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/KerberosUserNameMapperSelfTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.util;
-
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Tests for Kerberos name mapper.
- */
-public class KerberosUserNameMapperSelfTest extends GridCommonAbstractTest {
- /** Test instance. */
- private static final String INSTANCE = "test_instance";
-
- /** Test realm. */
- private static final String REALM = "test_realm";
-
- /**
- * Test mapper without instance and realm components.
- *
- * @throws Exception If failed.
- */
- public void testMapper() throws Exception {
- KerberosUserNameMapper mapper = create(null, null);
-
- assertEquals(IgfsUtils.fixUserName(null), mapper.map(null));
- assertEquals("test", mapper.map("test"));
- }
-
- /**
- * Test mapper with instance component.
- *
- * @throws Exception If failed.
- */
- public void testMapperInstance() throws Exception {
- KerberosUserNameMapper mapper = create(INSTANCE, null);
-
- assertEquals(IgfsUtils.fixUserName(null) + "/" + INSTANCE, mapper.map(null));
- assertEquals("test" + "/" + INSTANCE, mapper.map("test"));
- }
-
- /**
- * Test mapper with realm.
- *
- * @throws Exception If failed.
- */
- public void testMapperRealm() throws Exception {
- KerberosUserNameMapper mapper = create(null, REALM);
-
- assertEquals(IgfsUtils.fixUserName(null) + "@" + REALM, mapper.map(null));
- assertEquals("test" + "@" + REALM, mapper.map("test"));
- }
-
- /**
- * Test mapper with instance and realm components.
- *
- * @throws Exception If failed.
- */
- public void testMapperInstanceAndRealm() throws Exception {
- KerberosUserNameMapper mapper = create(INSTANCE, REALM);
-
- assertEquals(IgfsUtils.fixUserName(null) + "/" + INSTANCE + "@" + REALM, mapper.map(null));
- assertEquals("test" + "/" + INSTANCE + "@" + REALM, mapper.map("test"));
- }
-
- /**
- * Create mapper.
- *
- * @param instance Instance.
- * @param realm Realm.
- * @return Mapper.
- */
- private KerberosUserNameMapper create(@Nullable String instance, @Nullable String realm) {
- KerberosUserNameMapper mapper = new KerberosUserNameMapper();
-
- mapper.setInstance(instance);
- mapper.setRealm(realm);
-
- mapper.start();
-
- return mapper;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
deleted file mode 100644
index 2c25a06..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
-import org.apache.ignite.hadoop.util.ChainedUserNameMapper;
-import org.apache.ignite.hadoop.util.KerberosUserNameMapper;
-import org.apache.ignite.hadoop.util.UserNameMapper;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.internal.processors.igfs.IgfsDualAbstractSelfTest;
-import org.apache.ignite.lifecycle.LifecycleAware;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
-import static org.apache.ignite.igfs.HadoopSecondaryFileSystemConfigurationTest.SECONDARY_CFG_PATH;
-import static org.apache.ignite.igfs.HadoopSecondaryFileSystemConfigurationTest.configuration;
-import static org.apache.ignite.igfs.HadoopSecondaryFileSystemConfigurationTest.mkUri;
-import static org.apache.ignite.igfs.HadoopSecondaryFileSystemConfigurationTest.writeConfiguration;
-import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
-
-/**
- * Abstract test for Hadoop 1.0 file system stack.
- */
-public abstract class Hadoop1DualAbstractTest extends IgfsDualAbstractSelfTest {
- /** Secondary grid name */
- private static final String GRID_NAME = "grid_secondary";
-
- /** Secondary file system name */
- private static final String IGFS_NAME = "igfs_secondary";
-
- /** Secondary file system REST endpoint port */
- private static final int PORT = 11500;
-
- /** Secondary file system REST endpoint configuration map. */
- private static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration() {{
- setType(IgfsIpcEndpointType.TCP);
- setPort(PORT);
- }};
-
- /** Secondary file system authority. */
- private static final String SECONDARY_AUTHORITY = IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + PORT;
-
- /** Secondary Fs configuration full path. */
- protected String secondaryConfFullPath;
-
- /** Secondary Fs URI. */
- protected String secondaryUri;
-
- /** Constructor. */
- public Hadoop1DualAbstractTest(IgfsMode mode) {
- super(mode);
- }
-
- /**
- * Creates secondary filesystems.
- * @return IgfsSecondaryFileSystem
- * @throws Exception On failure.
- */
- @Override protected IgfsSecondaryFileSystem createSecondaryFileSystemStack() throws Exception {
- startUnderlying();
-
- prepareConfiguration();
-
- KerberosUserNameMapper mapper1 = new KerberosUserNameMapper();
-
- mapper1.setRealm("TEST.COM");
-
- TestUserNameMapper mapper2 = new TestUserNameMapper();
-
- ChainedUserNameMapper mapper = new ChainedUserNameMapper();
-
- mapper.setMappers(mapper1, mapper2);
-
- CachingHadoopFileSystemFactory factory = new CachingHadoopFileSystemFactory();
-
- factory.setUri(secondaryUri);
- factory.setConfigPaths(secondaryConfFullPath);
- factory.setUserNameMapper(mapper);
-
- IgniteHadoopIgfsSecondaryFileSystem second = new IgniteHadoopIgfsSecondaryFileSystem();
-
- second.setFileSystemFactory(factory);
-
- igfsSecondary = new HadoopIgfsSecondaryFileSystemTestAdapter(factory);
-
- return second;
- }
-
- /**
- * Starts underlying Ignite process.
- * @throws IOException On failure.
- */
- protected void startUnderlying() throws Exception {
- startGridWithIgfs(GRID_NAME, IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG, secondaryIpFinder);
- }
-
- /**
- * Prepares Fs configuration.
- * @throws IOException On failure.
- */
- protected void prepareConfiguration() throws IOException {
- Configuration secondaryConf = configuration(IGFS_SCHEME, SECONDARY_AUTHORITY, true, true);
-
- secondaryConf.setInt("fs.igfs.block.size", 1024);
-
- secondaryConfFullPath = writeConfiguration(secondaryConf, SECONDARY_CFG_PATH);
-
- secondaryUri = mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY);
- }
-
- /**
- * Test user name mapper.
- */
- private static class TestUserNameMapper implements UserNameMapper, LifecycleAware {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Started flag. */
- private boolean started;
-
- /** {@inheritDoc} */
- @Nullable @Override public String map(String name) {
- assert started;
- assert name != null && name.contains("@");
-
- return name.substring(0, name.indexOf("@"));
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- started = true;
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- // No-op.
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java
deleted file mode 100644
index bbf1223..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs;
-
-/**
- * DUAL_ASYNC mode test.
- */
-public class Hadoop1OverIgfsDualAsyncTest extends Hadoop1DualAbstractTest {
- /**
- * Constructor.
- */
- public Hadoop1OverIgfsDualAsyncTest() {
- super(IgfsMode.DUAL_ASYNC);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java
deleted file mode 100644
index c57415c..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs;
-
-/**
- * DUAL_SYNC mode.
- */
-public class Hadoop1OverIgfsDualSyncTest extends Hadoop1DualAbstractTest {
- /**
- * Constructor.
- */
- public Hadoop1OverIgfsDualSyncTest() {
- super(IgfsMode.DUAL_SYNC);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
deleted file mode 100644
index 5be3a64..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.jetbrains.annotations.Nullable;
-import java.io.Externalizable;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.net.URI;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-
-/**
- * Tests for Hadoop file system factory.
- */
-public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest {
- /** Amount of "start" invocations */
- private static final AtomicInteger START_CNT = new AtomicInteger();
-
- /** Amount of "stop" invocations */
- private static final AtomicInteger STOP_CNT = new AtomicInteger();
-
- /** Path to secondary file system configuration. */
- private static final String SECONDARY_CFG_PATH = "/work/core-site-HadoopFIleSystemFactorySelfTest.xml";
-
- /** IGFS path for DUAL mode. */
- private static final Path PATH_DUAL = new Path("/ignite/sync/test_dir");
-
- /** IGFS path for PROXY mode. */
- private static final Path PATH_PROXY = new Path("/ignite/proxy/test_dir");
-
- /** IGFS path for DUAL mode. */
- private static final IgfsPath IGFS_PATH_DUAL = new IgfsPath("/ignite/sync/test_dir");
-
- /** IGFS path for PROXY mode. */
- private static final IgfsPath IGFS_PATH_PROXY = new IgfsPath("/ignite/proxy/test_dir");
-
- /** Secondary IGFS. */
- private IgfsEx secondary;
-
- /** Primary IGFS. */
- private IgfsEx primary;
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- START_CNT.set(0);
- STOP_CNT.set(0);
-
- secondary = startSecondary();
- primary = startPrimary();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- secondary = null;
- primary = null;
-
- stopAllGrids();
- }
-
- /**
- * Test custom factory.
- *
- * @throws Exception If failed.
- */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public void testCustomFactory() throws Exception {
- assert START_CNT.get() == 1;
- assert STOP_CNT.get() == 0;
-
- // Use IGFS directly.
- primary.mkdirs(IGFS_PATH_DUAL);
-
- assert primary.exists(IGFS_PATH_DUAL);
- assert secondary.exists(IGFS_PATH_DUAL);
-
- // Create remote instance.
- FileSystem fs = FileSystem.get(URI.create("igfs://primary:primary@127.0.0.1:10500/"), baseConfiguration());
-
- // Ensure lifecycle callback was invoked.
- assert START_CNT.get() == 2;
- assert STOP_CNT.get() == 0;
-
- // Check file system operations.
- assert fs.exists(PATH_DUAL);
-
- assert fs.delete(PATH_DUAL, true);
- assert !primary.exists(IGFS_PATH_DUAL);
- assert !secondary.exists(IGFS_PATH_DUAL);
- assert !fs.exists(PATH_DUAL);
-
- assert fs.mkdirs(PATH_DUAL);
- assert primary.exists(IGFS_PATH_DUAL);
- assert secondary.exists(IGFS_PATH_DUAL);
- assert fs.exists(PATH_DUAL);
-
- assert fs.mkdirs(PATH_PROXY);
- assert secondary.exists(IGFS_PATH_PROXY);
- assert fs.exists(PATH_PROXY);
-
- // Close file system and ensure that associated factory was notified.
- fs.close();
-
- assert START_CNT.get() == 2;
- assert STOP_CNT.get() == 1;
-
- // Stop primary node and ensure that base factory was notified.
- G.stop(primary.context().kernalContext().grid().name(), true);
-
- assert START_CNT.get() == 2;
- assert STOP_CNT.get() == 2;
- }
-
- /**
- * Start secondary IGFS.
- *
- * @return IGFS.
- * @throws Exception If failed.
- */
- private static IgfsEx startSecondary() throws Exception {
- return start("secondary", 11500, IgfsMode.PRIMARY, null);
- }
-
- /**
- * Start primary IGFS.
- *
- * @return IGFS.
- * @throws Exception If failed.
- */
- private static IgfsEx startPrimary() throws Exception {
- // Prepare configuration.
- Configuration conf = baseConfiguration();
-
- conf.set("fs.defaultFS", "igfs://secondary:secondary@127.0.0.1:11500/");
-
- writeConfigurationToFile(conf);
-
- // Configure factory.
- TestFactory factory = new TestFactory();
-
- factory.setUri("igfs://secondary:secondary@127.0.0.1:11500/");
- factory.setConfigPaths(SECONDARY_CFG_PATH);
-
- // Configure file system.
- IgniteHadoopIgfsSecondaryFileSystem fs = new IgniteHadoopIgfsSecondaryFileSystem();
-
- fs.setFileSystemFactory(factory);
-
- // Start.
- return start("primary", 10500, IgfsMode.DUAL_ASYNC, fs);
- }
-
- /**
- * Start Ignite node with IGFS instance.
- *
- * @param name Node and IGFS name.
- * @param endpointPort Endpoint port.
- * @param dfltMode Default path mode.
- * @param secondaryFs Secondary file system.
- * @return Igfs instance.
- */
- private static IgfsEx start(String name, int endpointPort, IgfsMode dfltMode,
- @Nullable IgfsSecondaryFileSystem secondaryFs) {
- IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-
- endpointCfg.setType(IgfsIpcEndpointType.TCP);
- endpointCfg.setHost("127.0.0.1");
- endpointCfg.setPort(endpointPort);
-
- FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
-
- igfsCfg.setDataCacheName("dataCache");
- igfsCfg.setMetaCacheName("metaCache");
- igfsCfg.setName(name);
- igfsCfg.setDefaultMode(dfltMode);
- igfsCfg.setIpcEndpointConfiguration(endpointCfg);
- igfsCfg.setSecondaryFileSystem(secondaryFs);
- igfsCfg.setInitializeDefaultPathModes(true);
-
- CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
-
- dataCacheCfg.setName("dataCache");
- dataCacheCfg.setCacheMode(PARTITIONED);
- dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
- dataCacheCfg.setBackups(0);
- dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
- dataCacheCfg.setOffHeapMaxMemory(0);
-
- CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-
- metaCacheCfg.setName("metaCache");
- metaCacheCfg.setCacheMode(REPLICATED);
- metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- IgniteConfiguration cfg = new IgniteConfiguration();
-
- cfg.setGridName(name);
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-
- cfg.setDiscoverySpi(discoSpi);
- cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
- cfg.setFileSystemConfiguration(igfsCfg);
-
- cfg.setLocalHost("127.0.0.1");
- cfg.setConnectorConfiguration(null);
-
- return (IgfsEx)G.start(cfg).fileSystem(name);
- }
-
- /**
- * Create base FileSystem configuration.
- *
- * @return Configuration.
- */
- private static Configuration baseConfiguration() {
- Configuration conf = new Configuration();
-
- conf.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
-
- return conf;
- }
-
- /**
- * Write configuration to file.
- *
- * @param conf Configuration.
- * @throws Exception If failed.
- */
- @SuppressWarnings("ResultOfMethodCallIgnored")
- private static void writeConfigurationToFile(Configuration conf) throws Exception {
- final String path = U.getIgniteHome() + SECONDARY_CFG_PATH;
-
- File file = new File(path);
-
- file.delete();
-
- assertFalse(file.exists());
-
- try (FileOutputStream fos = new FileOutputStream(file)) {
- conf.writeXml(fos);
- }
-
- assertTrue(file.exists());
- }
-
- /**
- * Test factory.
- */
- private static class TestFactory extends CachingHadoopFileSystemFactory {
- /**
- * {@link Externalizable} support.
- */
- public TestFactory() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- START_CNT.incrementAndGet();
-
- super.start();
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- STOP_CNT.incrementAndGet();
-
- super.stop();
- }
- }
-}