You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:53:04 UTC
[12/92] [abbrv] [partial] ignite git commit: Moving classes around.
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java
deleted file mode 100644
index 3daf75d..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
-
-/**
- * IGFS Hadoop file system IPC shmem self test in DUAL_ASYNC mode.
- */
-public class IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest
- extends IgniteHadoopFileSystemShmemAbstractSelfTest {
- /**
- * Constructor.
- */
- public IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest() {
- super(DUAL_ASYNC, false);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java
deleted file mode 100644
index a1b6189..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
-
-/**
- * IGFS Hadoop file system IPC shmem self test in DUAL_SYNC mode.
- */
-public class IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest
- extends IgniteHadoopFileSystemShmemAbstractSelfTest {
- /**
- * Constructor.
- */
- public IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest() {
- super(DUAL_SYNC, false);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java
deleted file mode 100644
index f704292..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
-
-/**
- * IGFS Hadoop file system IPC shmem self test in PRIMARY mode.
- */
-public class IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest
- extends IgniteHadoopFileSystemShmemAbstractSelfTest {
- /**
- * Constructor.
- */
- public IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest() {
- super(PRIMARY, false);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java
deleted file mode 100644
index a9a7f49..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import static org.apache.ignite.igfs.IgfsMode.PROXY;
-
-/**
- * IGFS Hadoop file system IPC shmem self test in SECONDARY mode.
- */
-public class IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest
- extends IgniteHadoopFileSystemShmemAbstractSelfTest {
- /**
- * Constructor.
- */
- public IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest() {
- super(PROXY, false);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java
deleted file mode 100644
index 1355b09..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
-
-/**
- * IGFS Hadoop file system IPC shmem self test in DUAL_ASYNC mode.
- */
-public class IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest
- extends IgniteHadoopFileSystemShmemAbstractSelfTest {
- /**
- * Constructor.
- */
- public IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest() {
- super(DUAL_ASYNC, true);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java
deleted file mode 100644
index 359c630..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
-
-/**
- * IGFS Hadoop file system IPC shmem self test in DUAL_SYNC mode.
- */
-public class IgniteHadoopFileSystemShmemExternalDualSyncSelfTest
- extends IgniteHadoopFileSystemShmemAbstractSelfTest {
- /**
- * Constructor.
- */
- public IgniteHadoopFileSystemShmemExternalDualSyncSelfTest() {
- super(DUAL_SYNC, true);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java
deleted file mode 100644
index d273069..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
-
-/**
- * IGFS Hadoop file system IPC shmem self test in PRIMARY mode.
- */
-public class IgniteHadoopFileSystemShmemExternalPrimarySelfTest
- extends IgniteHadoopFileSystemShmemAbstractSelfTest {
- /**
- * Constructor.
- */
- public IgniteHadoopFileSystemShmemExternalPrimarySelfTest() {
- super(PRIMARY, true);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java
deleted file mode 100644
index 9c64c8d..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import static org.apache.ignite.igfs.IgfsMode.PROXY;
-
-/**
- * IGFS Hadoop file system IPC shmem self test in SECONDARY mode.
- */
-public class IgniteHadoopFileSystemShmemExternalSecondarySelfTest
- extends IgniteHadoopFileSystemShmemAbstractSelfTest {
- /**
- * Constructor.
- */
- public IgniteHadoopFileSystemShmemExternalSecondarySelfTest() {
- super(PROXY, true);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
new file mode 100644
index 0000000..3fa4e2d
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsUserContext;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount1;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.JOB_COUNTER_WRITER_PROPERTY;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Abstract test of whole cycle of map-reduce processing via Job tracker.
+ */
+public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest {
+ /** IGFS block size. */
+ protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
+
+ /** Amount of blocks to prefetch. */
+ protected static final int PREFETCH_BLOCKS = 1;
+
+ /** Amount of sequential block reads before prefetch is triggered. */
+ protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
+
+ /** Secondary file system URI. */
+ protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
+
+ /** Secondary file system configuration path. */
+ protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
+
+ /** The user to run Hadoop job on behalf of. */
+ protected static final String USER = "vasya";
+
+ /** Secondary IGFS name. */
+ protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
+
+ /** Red constant. */
+ protected static final int red = 10_000;
+
+ /** Blue constant. */
+ protected static final int blue = 20_000;
+
+ /** Green constant. */
+ protected static final int green = 15_000;
+
+ /** Yellow constant. */
+ protected static final int yellow = 7_000;
+
+ /** The secondary Ignite node. */
+ protected Ignite igniteSecondary;
+
+ /** The secondary Fs. */
+ protected IgfsSecondaryFileSystem secondaryFs;
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /**
+ * Gets owner of a IgfsEx path.
+ * @param p The path.
+ * @return The owner.
+ */
+ private static String getOwner(final IgfsEx i, final IgfsPath p) {
+ return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
+ @Override public String apply() {
+ IgfsFile f = i.info(p);
+
+ assert f != null;
+
+ return f.property(IgfsUtils.PROP_USER_NAME);
+ }
+ });
+ }
+
+ /**
+ * Gets owner of a secondary Fs path.
+ * @param secFs The sec Fs.
+ * @param p The path.
+ * @return The owner.
+ */
+ private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
+ return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
+ @Override public String apply() {
+ return secFs.info(p).property(IgfsUtils.PROP_USER_NAME);
+ }
+ });
+ }
+
+ /**
+ * Checks owner of the path.
+ * @param p The path.
+ */
+ private void checkOwner(IgfsPath p) {
+ String ownerPrim = getOwner(igfs, p);
+ assertEquals(USER, ownerPrim);
+
+ String ownerSec = getOwnerSecondary(secondaryFs, p);
+ assertEquals(USER, ownerSec);
+ }
+
+ /**
+ * Does actual test job
+ *
+ * @param useNewMapper flag to use new mapper API.
+ * @param useNewCombiner flag to use new combiner API.
+ * @param useNewReducer flag to use new reducer API.
+ */
+ protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner, boolean useNewReducer)
+ throws Exception {
+ igfs.delete(new IgfsPath(PATH_OUTPUT), true);
+
+ JobConf jobConf = new JobConf();
+
+ jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
+ jobConf.setUser(USER);
+ jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
+
+ //To split into about 40 items for v2
+ jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
+
+ //For v1
+ jobConf.setInt("fs.local.block.size", 65000);
+
+ // File system coordinates.
+ setupFileSystems(jobConf);
+
+ HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer);
+
+ Job job = Job.getInstance(jobConf);
+
+ HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy());
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
+ FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
+
+ job.setJarByClass(HadoopWordCount2.class);
+
+ HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
+
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+ fut.get();
+
+ checkJobStatistics(jobId);
+
+ final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
+
+ checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
+
+ checkOwner(new IgfsPath(outFile));
+
+ String actual = readAndSortFile(outFile, job.getConfiguration());
+
+ assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
+ useNewReducer,
+ "blue\t" + blue + "\n" +
+ "green\t" + green + "\n" +
+ "red\t" + red + "\n" +
+ "yellow\t" + yellow + "\n",
+ actual
+ );
+ }
+
+ /**
+ * Gets if to compress output data with Snappy.
+ *
+ * @return If to compress output data with Snappy.
+ */
+ protected boolean compressOutputSnappy() {
+ return false;
+ }
+
+ /**
+ * Simple test job statistics.
+ *
+ * @param jobId Job id.
+ * @throws IgniteCheckedException
+ */
+ private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException {
+ HadoopCounters cntrs = grid(0).hadoop().counters(jobId);
+
+ HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
+
+ Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>();
+
+ Map<String, Integer> phaseOrders = new HashMap<>();
+ phaseOrders.put("submit", 0);
+ phaseOrders.put("prepare", 1);
+ phaseOrders.put("start", 2);
+ phaseOrders.put("Cstart", 3);
+ phaseOrders.put("finish", 4);
+
+ String prevTaskId = null;
+
+ long apiEvtCnt = 0;
+
+ for (T2<String, Long> evt : perfCntr.evts()) {
+ //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706
+ String[] parsedEvt = evt.get1().split(" ");
+
+ String taskId;
+ String taskPhase;
+
+ if ("JOB".equals(parsedEvt[0])) {
+ taskId = parsedEvt[0];
+ taskPhase = parsedEvt[1];
+ }
+ else {
+ taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1];
+ taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2];
+ }
+
+ if (!taskId.equals(prevTaskId))
+ tasks.put(taskId, new TreeMap<Integer,Long>());
+
+ Integer pos = phaseOrders.get(taskPhase);
+
+ assertNotNull("Invalid phase " + taskPhase, pos);
+
+ tasks.get(taskId).put(pos, evt.get2());
+
+ prevTaskId = taskId;
+
+ apiEvtCnt++;
+ }
+
+ for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) {
+ Map<Integer, Long> order = task.getValue();
+
+ long prev = 0;
+
+ for (Map.Entry<Integer, Long> phase : order.entrySet()) {
+ assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev);
+
+ prev = phase.getValue();
+ }
+ }
+
+ final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
+
+ assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return igfs.exists(statPath);
+ }
+ }, 20_000);
+
+ final long apiEvtCnt0 = apiEvtCnt;
+
+ boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)))) {
+ return apiEvtCnt0 == HadoopTestUtils.simpleCheckJobStatFile(reader);
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 10000);
+
+ if (!res) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)));
+
+ assert false : "Invalid API events count [exp=" + apiEvtCnt0 +
+ ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']';
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG);
+
+ super.beforeTest();
+ }
+
+ /**
+ * Start grid with IGFS.
+ *
+ * @param gridName Grid name.
+ * @param igfsName IGFS name
+ * @param mode IGFS mode.
+ * @param secondaryFs Secondary file system (optional).
+ * @param restCfg Rest configuration string (optional).
+ * @return Started grid instance.
+ * @throws Exception If failed.
+ */
+ protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
+ @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("dataCache");
+ igfsCfg.setMetaCacheName("metaCache");
+ igfsCfg.setName(igfsName);
+ igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
+ igfsCfg.setDefaultMode(mode);
+ igfsCfg.setIpcEndpointConfiguration(restCfg);
+ igfsCfg.setSecondaryFileSystem(secondaryFs);
+ igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
+ igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+
+ CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+ dataCacheCfg.setName("dataCache");
+ dataCacheCfg.setCacheMode(PARTITIONED);
+ dataCacheCfg.setNearConfiguration(null);
+ dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+ dataCacheCfg.setBackups(0);
+ dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+ dataCacheCfg.setOffHeapMaxMemory(0);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("metaCache");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+ cfg.setFileSystemConfiguration(igfsCfg);
+
+ cfg.setLocalHost("127.0.0.1");
+ cfg.setConnectorConfiguration(null);
+
+ HadoopConfiguration hadoopCfg = createHadoopConfiguration();
+
+ if (hadoopCfg != null)
+ cfg.setHadoopConfiguration(hadoopCfg);
+
+ return G.start(cfg);
+ }
+
+ /**
+ * Creates custom Hadoop configuration.
+ *
+ * @return The Hadoop configuration.
+ */
+ protected HadoopConfiguration createHadoopConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
+ FileSystemConfiguration fsCfg = super.igfsConfiguration();
+
+ secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
+
+ fsCfg.setSecondaryFileSystem(secondaryFs);
+
+ return fsCfg;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
new file mode 100644
index 0000000..3ea6ce4
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.File;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.igfs.IgfsIpcEndpointType;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Abstract class for Hadoop tests.
+ */
+public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** REST port. */
+ protected static final int REST_PORT = 11212;
+
+ /** IGFS name. */
+ protected static final String igfsName = null;
+
+ /** IGFS name. */
+ protected static final String igfsMetaCacheName = "meta";
+
+ /** IGFS name. */
+ protected static final String igfsDataCacheName = "data";
+
+ /** IGFS block size. */
+ protected static final int igfsBlockSize = 1024;
+
+ /** IGFS block group size. */
+ protected static final int igfsBlockGroupSize = 8;
+
+ /** Initial REST port. */
+ private int restPort = REST_PORT;
+
+ /** Secondary file system REST endpoint configuration. */
+ protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG;
+
+ static {
+ SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+
+ SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+ SECONDARY_REST_CFG.setPort(11500);
+ }
+
+
+ /** Initial classpath. */
+ private static String initCp;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // Add surefire classpath to regular classpath.
+ initCp = System.getProperty("java.class.path");
+
+ String surefireCp = System.getProperty("surefire.test.class.path");
+
+ if (surefireCp != null)
+ System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp);
+
+ super.beforeTestsStarted();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ // Restore classpath.
+ System.setProperty("java.class.path", initCp);
+
+ initCp = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setHadoopConfiguration(hadoopConfiguration(gridName));
+
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ if (igfsEnabled()) {
+ cfg.setCacheConfiguration(metaCacheConfiguration(), dataCacheConfiguration());
+
+ cfg.setFileSystemConfiguration(igfsConfiguration());
+ }
+
+ if (restEnabled()) {
+ ConnectorConfiguration clnCfg = new ConnectorConfiguration();
+
+ clnCfg.setPort(restPort++);
+
+ cfg.setConnectorConfiguration(clnCfg);
+ }
+
+ cfg.setLocalHost("127.0.0.1");
+ cfg.setPeerClassLoadingEnabled(false);
+
+ return cfg;
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @return Hadoop configuration.
+ */
+ public HadoopConfiguration hadoopConfiguration(String gridName) {
+ HadoopConfiguration cfg = new HadoopConfiguration();
+
+ cfg.setMaxParallelTasks(3);
+
+ return cfg;
+ }
+
+ /**
+ * @return IGFS configuration.
+ */
+ public FileSystemConfiguration igfsConfiguration() throws Exception {
+ FileSystemConfiguration cfg = new FileSystemConfiguration();
+
+ cfg.setName(igfsName);
+ cfg.setBlockSize(igfsBlockSize);
+ cfg.setDataCacheName(igfsDataCacheName);
+ cfg.setMetaCacheName(igfsMetaCacheName);
+ cfg.setFragmentizerEnabled(false);
+
+ return cfg;
+ }
+
+ /**
+ * @return IGFS meta cache configuration.
+ */
+ public CacheConfiguration metaCacheConfiguration() {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setName(igfsMetaCacheName);
+ cfg.setCacheMode(REPLICATED);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ return cfg;
+ }
+
+ /**
+ * @return IGFS data cache configuration.
+ */
+ private CacheConfiguration dataCacheConfiguration() {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setName(igfsDataCacheName);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(igfsBlockGroupSize));
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ return cfg;
+ }
+
+ /**
+ * @return {@code True} if IGFS is enabled on Hadoop nodes.
+ */
+ protected boolean igfsEnabled() {
+ return false;
+ }
+
+ /**
+ * @return {@code True} if REST is enabled on Hadoop nodes.
+ */
+ protected boolean restEnabled() {
+ return false;
+ }
+
+ /**
+ * @return Number of nodes to start.
+ */
+ protected int gridCount() {
+ return 3;
+ }
+
+ /**
+ * @param cfg Config.
+ */
+ protected void setupFileSystems(Configuration cfg) {
+ cfg.set("fs.defaultFS", igfsScheme());
+ cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName());
+ cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.
+ class.getName());
+
+ HadoopFileSystemsUtils.setupFileSystems(cfg);
+ }
+
+ /**
+ * @return IGFS scheme for test.
+ */
+ protected String igfsScheme() {
+ return "igfs://:" + getTestGridName(0) + "@/";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
new file mode 100644
index 0000000..3cb8f91
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import com.google.common.base.Joiner;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+
+/**
+ * Abstract class for tests based on WordCount test job.
+ */
+public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest {
+ /** Input path. */
+ protected static final String PATH_INPUT = "/input";
+
+ /** Output path. */
+ protected static final String PATH_OUTPUT = "/output";
+
+ /** IGFS instance. */
+ protected IgfsEx igfs;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ Configuration cfg = new Configuration();
+
+ setupFileSystems(cfg);
+
+ // Init cache by correct LocalFileSystem implementation
+ FileSystem.getLocal(cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean igfsEnabled() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /**
+ * Generates test file.
+ *
+ * @param path File name.
+ * @param wordCounts Words and counts.
+ * @throws Exception If failed.
+ */
+ protected void generateTestFile(String path, Object... wordCounts) throws Exception {
+ List<String> wordsArr = new ArrayList<>();
+
+ //Generating
+ for (int i = 0; i < wordCounts.length; i += 2) {
+ String word = (String) wordCounts[i];
+ int cnt = (Integer) wordCounts[i + 1];
+
+ while (cnt-- > 0)
+ wordsArr.add(word);
+ }
+
+ //Shuffling
+ for (int i = 0; i < wordsArr.size(); i++) {
+ int j = (int)(Math.random() * wordsArr.size());
+
+ Collections.swap(wordsArr, i, j);
+ }
+
+ //Input file preparing
+ PrintWriter testInputFileWriter = new PrintWriter(igfs.create(new IgfsPath(path), true));
+
+ int j = 0;
+
+ while (j < wordsArr.size()) {
+ int i = 5 + (int)(Math.random() * 5);
+
+ List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size()));
+ j += i;
+
+ testInputFileWriter.println(Joiner.on(' ').join(subList));
+ }
+
+ testInputFileWriter.close();
+ }
+
+ /**
+ * Read w/o decoding (default).
+ *
+ * @param fileName The file.
+ * @return The file contents, human-readable.
+ * @throws Exception On error.
+ */
+ protected String readAndSortFile(String fileName) throws Exception {
+ return readAndSortFile(fileName, null);
+ }
+
+ /**
+ * Reads whole text file into String.
+ *
+ * @param fileName Name of the file to read.
+ * @return Content of the file as String value.
+ * @throws Exception If could not read the file.
+ */
+ protected String readAndSortFile(String fileName, Configuration conf) throws Exception {
+ final List<String> list = new ArrayList<>();
+
+ final boolean snappyDecode = conf != null && conf.getBoolean(FileOutputFormat.COMPRESS, false);
+
+ if (snappyDecode) {
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(new Path(fileName)))) {
+ Text key = new Text();
+
+ IntWritable val = new IntWritable();
+
+ while (reader.next(key, val))
+ list.add(key + "\t" + val);
+ }
+ }
+ else {
+ try (InputStream is0 = igfs.open(new IgfsPath(fileName))) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is0));
+
+ String line;
+
+ while ((line = reader.readLine()) != null)
+ list.add(line);
+ }
+ }
+
+ Collections.sort(list);
+
+ return Joiner.on('\n').join(list) + "\n";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopClassLoaderTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopClassLoaderTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopClassLoaderTest.java
new file mode 100644
index 0000000..9400bc2
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopClassLoaderTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import javax.security.auth.AuthPermission;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.CircularWIthHadoop;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.CircularWithoutHadoop;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithIndirectField;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithCast;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithClassAnnotation;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithConstructorInvocation;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithMethodCheckedException;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithMethodRuntimeException;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithExtends;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithField;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithImplements;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithInitializer;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithInnerClass;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithLocalVariable;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithMethodAnnotation;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithMethodInvocation;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithMethodArgument;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithMethodReturnType;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithOuterClass;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithParameterAnnotation;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithStaticField;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.WithStaticInitializer;
+import org.apache.ignite.internal.processors.hadoop.impl.deps.Without;
+
+/**
+ * Tests for Hadoop classloader.
+ */
+public class HadoopClassLoaderTest extends TestCase {
+ /** */
+ final HadoopClassLoader ldr = new HadoopClassLoader(null, "test", null, new HadoopHelperImpl());
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClassLoading() throws Exception {
+ assertNotSame(CircularWIthHadoop.class, ldr.loadClass(CircularWIthHadoop.class.getName()));
+ assertNotSame(CircularWithoutHadoop.class, ldr.loadClass(CircularWithoutHadoop.class.getName()));
+
+ assertSame(Without.class, ldr.loadClass(Without.class.getName()));
+ }
+
+ /**
+ * Test dependency search.
+ */
+ public void testDependencySearch() {
+ // Positive cases:
+ final Class[] positiveClasses = {
+ Configuration.class,
+ HadoopUtils.class,
+ WithStaticField.class,
+ WithCast.class,
+ WithClassAnnotation.class,
+ WithConstructorInvocation.class,
+ WithMethodCheckedException.class,
+ WithMethodRuntimeException.class,
+ WithExtends.class,
+ WithField.class,
+ WithImplements.class,
+ WithInitializer.class,
+ WithInnerClass.class,
+ WithOuterClass.InnerNoHadoop.class,
+ WithLocalVariable.class,
+ WithMethodAnnotation.class,
+ WithMethodInvocation.class,
+ WithMethodArgument.class,
+ WithMethodReturnType.class,
+ WithParameterAnnotation.class,
+ WithStaticField.class,
+ WithStaticInitializer.class,
+ WithIndirectField.class,
+ CircularWIthHadoop.class,
+ CircularWithoutHadoop.class,
+ };
+
+ for (Class c : positiveClasses)
+ assertTrue(c.getName(), ldr.hasExternalDependencies(c.getName()));
+
+ // Negative cases:
+ final Class[] negativeClasses = {
+ Object.class,
+ AuthPermission.class,
+ Without.class,
+ };
+
+ for (Class c : negativeClasses)
+ assertFalse(c.getName(), ldr.hasExternalDependencies(c.getName()));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java
new file mode 100644
index 0000000..3c42bc2
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import com.google.common.base.Joiner;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
+import org.apache.ignite.igfs.IgfsInputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ * Test of integration with Hadoop client via command line interface.
+ */
+public class HadoopCommandLineTest extends GridCommonAbstractTest {
+ /** IGFS instance. */
+ private IgfsEx igfs;
+
+ /** */
+ private static final String igfsName = "igfs";
+
+ /** */
+ private static File testWorkDir;
+
+ /** */
+ private static String hadoopHome;
+
+ /** */
+ private static String hiveHome;
+
+ /** */
+ private static File examplesJar;
+
+ /**
+ *
+ * @param path File name.
+ * @param wordCounts Words and counts.
+ * @throws Exception If failed.
+ */
+ private void generateTestFile(File path, Object... wordCounts) throws Exception {
+ List<String> wordsArr = new ArrayList<>();
+
+ //Generating
+ for (int i = 0; i < wordCounts.length; i += 2) {
+ String word = (String) wordCounts[i];
+ int cnt = (Integer) wordCounts[i + 1];
+
+ while (cnt-- > 0)
+ wordsArr.add(word);
+ }
+
+ //Shuffling
+ for (int i = 0; i < wordsArr.size(); i++) {
+ int j = (int)(Math.random() * wordsArr.size());
+
+ Collections.swap(wordsArr, i, j);
+ }
+
+ //Writing file
+ try (PrintWriter writer = new PrintWriter(path)) {
+ int j = 0;
+
+ while (j < wordsArr.size()) {
+ int i = 5 + (int)(Math.random() * 5);
+
+ List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size()));
+ j += i;
+
+ writer.println(Joiner.on(' ').join(subList));
+ }
+
+ writer.flush();
+ }
+ }
+
+ /**
+ * Generates two data files to join its with Hive.
+ *
+ * @throws FileNotFoundException If failed.
+ */
+ private void generateHiveTestFiles() throws FileNotFoundException {
+ try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a"));
+ PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) {
+ char sep = '\t';
+
+ int idB = 0;
+ int idA = 0;
+ int v = 1000;
+
+ for (int i = 0; i < 1000; i++) {
+ writerA.print(idA++);
+ writerA.print(sep);
+ writerA.println(idB);
+
+ writerB.print(idB++);
+ writerB.print(sep);
+ writerB.println(v += 2);
+
+ writerB.print(idB++);
+ writerB.print(sep);
+ writerB.println(v += 2);
+ }
+
+ writerA.flush();
+ writerB.flush();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ hiveHome = IgniteSystemProperties.getString("HIVE_HOME");
+
+ assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome));
+
+ hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME");
+
+ assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome));
+
+ String mapredHome = hadoopHome + "/share/hadoop/mapreduce";
+
+ File[] fileList = new File(mapredHome).listFiles(new FileFilter() {
+ @Override public boolean accept(File pathname) {
+ return pathname.getName().startsWith("hadoop-mapreduce-examples-") &&
+ pathname.getName().endsWith(".jar");
+ }
+ });
+
+ assertEquals("Invalid hadoop distribution.", 1, fileList.length);
+
+ examplesJar = fileList[0];
+
+ testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile();
+
+ U.copy(resolveHadoopConfig("core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false);
+
+ File srcFile = resolveHadoopConfig("mapred-site.ignite.xml");
+ File dstFile = new File(testWorkDir, "mapred-site.xml");
+
+ try (BufferedReader in = new BufferedReader(new FileReader(srcFile));
+ PrintWriter out = new PrintWriter(dstFile)) {
+ String line;
+
+ while ((line = in.readLine()) != null) {
+ if (line.startsWith("</configuration>"))
+ out.println(
+ " <property>\n" +
+ " <name>" + HadoopUtils.JOB_COUNTER_WRITER_PROPERTY + "</name>\n" +
+ " <value>" + IgniteHadoopFileSystemCounterWriter.class.getName() + "</value>\n" +
+ " </property>\n");
+
+ out.println(line);
+ }
+
+ out.flush();
+ }
+
+ generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50);
+
+ generateHiveTestFiles();
+ }
+
+ /**
+ * Resolve Hadoop configuration file.
+ *
+ * @param name File name.
+ * @return Resolve file.
+ */
+ private static File resolveHadoopConfig(String name) {
+ File path = U.resolveIgnitePath("modules/hadoop/config/" + name);
+
+ return path != null ? path : U.resolveIgnitePath("config/hadoop/" + name);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ U.delete(testWorkDir);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ String cfgPath = "config/hadoop/default-config.xml";
+
+ IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath);
+
+ IgniteConfiguration cfg = tup.get1();
+
+ cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes.
+
+ igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ }
+
+ /**
+ * Creates the process build with appropriate environment to run Hadoop CLI.
+ *
+ * @return Process builder.
+ */
+ private ProcessBuilder createProcessBuilder() {
+ String sep = ":";
+
+ String ggClsPath = HadoopJob.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep +
+ HadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep +
+ ConcurrentHashMap8.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+
+ ProcessBuilder res = new ProcessBuilder();
+
+ res.environment().put("HADOOP_HOME", hadoopHome);
+ res.environment().put("HADOOP_CLASSPATH", ggClsPath);
+ res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath());
+
+ res.redirectErrorStream(true);
+
+ return res;
+ }
+
+ /**
+ * Waits for process exit and prints the its output.
+ *
+ * @param proc Process.
+ * @return Exit code.
+ * @throws Exception If failed.
+ */
+ private int watchProcess(Process proc) throws Exception {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+ String line;
+
+ while ((line = reader.readLine()) != null)
+ log().info(line);
+
+ return proc.waitFor();
+ }
+
+ /**
+ * Executes Hadoop command line tool.
+ *
+ * @param args Arguments for Hadoop command line tool.
+ * @return Process exit code.
+ * @throws Exception If failed.
+ */
+ private int executeHadoopCmd(String... args) throws Exception {
+ ProcessBuilder procBuilder = createProcessBuilder();
+
+ List<String> cmd = new ArrayList<>();
+
+ cmd.add(hadoopHome + "/bin/hadoop");
+ cmd.addAll(Arrays.asList(args));
+
+ procBuilder.command(cmd);
+
+ log().info("Execute: " + procBuilder.command());
+
+ return watchProcess(procBuilder.start());
+ }
+
+ /**
+ * Executes Hive query.
+ *
+ * @param qry Query.
+ * @return Process exit code.
+ * @throws Exception If failed.
+ */
+ private int executeHiveQuery(String qry) throws Exception {
+ ProcessBuilder procBuilder = createProcessBuilder();
+
+ List<String> cmd = new ArrayList<>();
+
+ procBuilder.command(cmd);
+
+ cmd.add(hiveHome + "/bin/hive");
+
+ cmd.add("--hiveconf");
+ cmd.add("hive.rpc.query.plan=true");
+
+ cmd.add("--hiveconf");
+ cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" +
+ "databaseName=metastore_db;create=true");
+
+ cmd.add("-e");
+ cmd.add(qry);
+
+ procBuilder.command(cmd);
+
+ log().info("Execute: " + procBuilder.command());
+
+ return watchProcess(procBuilder.start());
+ }
+
+ /**
+ * Tests Hadoop command line integration.
+ */
+ public void testHadoopCommandLine() throws Exception {
+ assertEquals(0, executeHadoopCmd("fs", "-ls", "/"));
+
+ assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input"));
+
+ assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input"));
+
+ assertTrue(igfs.exists(new IgfsPath("/input/test-data")));
+
+ assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output"));
+
+ IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/");
+
+ assertTrue(igfs.exists(path));
+
+ IgfsPath jobStatPath = null;
+
+ for (IgfsPath jobPath : igfs.listPaths(path)) {
+ assertNull(jobStatPath);
+
+ jobStatPath = jobPath;
+ }
+
+ File locStatFile = new File(testWorkDir, "performance");
+
+ assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString()));
+
+ long evtCnt = HadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile)));
+
+ assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner.
+
+ assertTrue(igfs.exists(new IgfsPath("/output")));
+
+ BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000"))));
+
+ List<String> res = new ArrayList<>();
+
+ String line;
+
+ while ((line = in.readLine()) != null)
+ res.add(line);
+
+ Collections.sort(res);
+
+ assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString());
+ }
+
+ /**
+ * Runs query check result.
+ *
+ * @param expRes Expected result.
+ * @param qry Query.
+ * @throws Exception If failed.
+ */
+ private void checkQuery(String expRes, String qry) throws Exception {
+ assertEquals(0, executeHiveQuery("drop table if exists result"));
+
+ assertEquals(0, executeHiveQuery(
+ "create table result " +
+ "row format delimited fields terminated by ' ' " +
+ "stored as textfile " +
+ "location '/result' as " + qry
+ ));
+
+ IgfsInputStream in = igfs.open(new IgfsPath("/result/000000_0"));
+
+ byte[] buf = new byte[(int) in.length()];
+
+ in.read(buf);
+
+ assertEquals(expRes, new String(buf));
+ }
+
+ /**
+ * Tests Hive integration.
+ */
+ public void testHiveCommandLine() throws Exception {
+ assertEquals(0, executeHiveQuery(
+ "create table table_a (" +
+ "id_a int," +
+ "id_b int" +
+ ") " +
+ "row format delimited fields terminated by '\\t'" +
+ "stored as textfile " +
+ "location '/table-a'"
+ ));
+
+ assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a"));
+
+ assertEquals(0, executeHiveQuery(
+ "create table table_b (" +
+ "id_b int," +
+ "rndv int" +
+ ") " +
+ "row format delimited fields terminated by '\\t'" +
+ "stored as textfile " +
+ "location '/table-b'"
+ ));
+
+ assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b"));
+
+ checkQuery(
+ "0 0\n" +
+ "1 2\n" +
+ "2 4\n" +
+ "3 6\n" +
+ "4 8\n" +
+ "5 10\n" +
+ "6 12\n" +
+ "7 14\n" +
+ "8 16\n" +
+ "9 18\n",
+ "select * from table_a order by id_a limit 10"
+ );
+
+ checkQuery("2000\n", "select count(id_b) from table_b");
+
+ checkQuery(
+ "250 500 2002\n" +
+ "251 502 2006\n" +
+ "252 504 2010\n" +
+ "253 506 2014\n" +
+ "254 508 2018\n" +
+ "255 510 2022\n" +
+ "256 512 2026\n" +
+ "257 514 2030\n" +
+ "258 516 2034\n" +
+ "259 518 2038\n",
+ "select a.id_a, a.id_b, b.rndv" +
+ " from table_a a" +
+ " inner join table_b b on a.id_b = b.id_b" +
+ " where b.rndv > 2000" +
+ " order by a.id_a limit 10"
+ );
+
+ checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b");
+ }
+}
\ No newline at end of file