You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/26 11:24:48 UTC
[07/47] ignite git commit: IGNITE-3912: Hadoop: Implemented new class
loading architecture for embedded execution mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualAbstractSelfTest.java
new file mode 100644
index 0000000..8816182
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualAbstractSelfTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsInputStream;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.igfs.IgfsIpcEndpointType;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsOutputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
+import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
+import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsImpl;
+import org.apache.ignite.internal.processors.igfs.IgfsMetaManager;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.Callable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
+import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
+import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
+import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.awaitFileClose;
+import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.clear;
+import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.create;
+
+/**
+ * Tests for IGFS working in mode when remote file system exists: DUAL_SYNC, DUAL_ASYNC.
+ */
+public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractTest {
+ /** IGFS block size. */
+ protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
+
+ /** Amount of blocks to prefetch. */
+ protected static final int PREFETCH_BLOCKS = 1;
+
+ /** Amount of sequential block reads before prefetch is triggered. */
+ protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
+
+ /** Secondary file system URI. */
+ protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
+
+ /** Secondary file system configuration path. */
+ protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
+
+ /** Primary file system URI. */
+ protected static final String PRIMARY_URI = "igfs://igfs:grid@/";
+
+ /** Primary file system configuration path. */
+ protected static final String PRIMARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback.xml";
+
+ /** Primary file system REST endpoint configuration map. */
+ protected static final IgfsIpcEndpointConfiguration PRIMARY_REST_CFG;
+
+ /** Secondary file system REST endpoint configuration map. */
+ protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG;
+
+ /** Directory. */
+ protected static final IgfsPath DIR = new IgfsPath("/dir");
+
+ /** Sub-directory. */
+ protected static final IgfsPath SUBDIR = new IgfsPath(DIR, "subdir");
+
+ /** File. */
+ protected static final IgfsPath FILE = new IgfsPath(SUBDIR, "file");
+
+ /** Default data chunk (128 bytes). */
+ protected static byte[] chunk;
+
+ /** Primary IGFS. */
+ protected static IgfsImpl igfs;
+
+ /** Secondary IGFS. */
+ protected static IgfsImpl igfsSecondary;
+
+ /** IGFS mode. */
+ protected final IgfsMode mode;
+
+ static {
+ PRIMARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+
+ PRIMARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+ PRIMARY_REST_CFG.setPort(10500);
+
+ SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+
+ SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+ SECONDARY_REST_CFG.setPort(11500);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param mode IGFS mode.
+ */
+ protected HadoopIgfsDualAbstractSelfTest(IgfsMode mode) {
+ this.mode = mode;
+ assert mode == DUAL_SYNC || mode == DUAL_ASYNC;
+ }
+
+ /**
+ * Start grid with IGFS.
+ *
+ * @param gridName Grid name.
+ * @param igfsName IGFS name
+ * @param mode IGFS mode.
+ * @param secondaryFs Secondary file system (optional).
+ * @param restCfg Rest configuration string (optional).
+ * @return Started grid instance.
+ * @throws Exception If failed.
+ */
+ protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
+ @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("dataCache");
+ igfsCfg.setMetaCacheName("metaCache");
+ igfsCfg.setName(igfsName);
+ igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
+ igfsCfg.setDefaultMode(mode);
+ igfsCfg.setIpcEndpointConfiguration(restCfg);
+ igfsCfg.setSecondaryFileSystem(secondaryFs);
+ igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
+ igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+
+ CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+ dataCacheCfg.setName("dataCache");
+ dataCacheCfg.setCacheMode(PARTITIONED);
+ dataCacheCfg.setNearConfiguration(null);
+ dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+ dataCacheCfg.setBackups(0);
+ dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+ dataCacheCfg.setOffHeapMaxMemory(0);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("metaCache");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+ cfg.setFileSystemConfiguration(igfsCfg);
+
+ cfg.setLocalHost("127.0.0.1");
+ cfg.setConnectorConfiguration(null);
+
+ return G.start(cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ chunk = new byte[128];
+
+ for (int i = 0; i < chunk.length; i++)
+ chunk[i] = (byte)i;
+
+ Ignite igniteSecondary = startGridWithIgfs("grid-secondary", "igfs-secondary", PRIMARY, null, SECONDARY_REST_CFG);
+
+ IgfsSecondaryFileSystem hadoopFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
+
+ Ignite ignite = startGridWithIgfs("grid", "igfs", mode, hadoopFs, PRIMARY_REST_CFG);
+
+ igfsSecondary = (IgfsImpl) igniteSecondary.fileSystem("igfs-secondary");
+ igfs = (IgfsImpl) ignite.fileSystem("igfs");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ clear(igfs);
+ clear(igfsSecondary);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ G.stopAll(true);
+ }
+
+ /**
+ * Convenient method to group paths.
+ *
+ * @param paths Paths to group.
+ * @return Paths as array.
+ */
+ protected IgfsPath[] paths(IgfsPath... paths) {
+ return paths;
+ }
+
+ /**
+ * Check how prefetch override works.
+ *
+ * @throws Exception IF failed.
+ */
+ public void testOpenPrefetchOverride() throws Exception {
+ create(igfsSecondary, paths(DIR, SUBDIR), paths(FILE));
+
+ // Write enough data to the secondary file system.
+ final int blockSize = IGFS_BLOCK_SIZE;
+
+ IgfsOutputStream out = igfsSecondary.append(FILE, false);
+
+ int totalWritten = 0;
+
+ while (totalWritten < blockSize * 2 + chunk.length) {
+ out.write(chunk);
+
+ totalWritten += chunk.length;
+ }
+
+ out.close();
+
+ awaitFileClose(igfsSecondary, FILE);
+
+ // Instantiate file system with overridden "seq reads before prefetch" property.
+ Configuration cfg = new Configuration();
+
+ cfg.addResource(U.resolveIgniteUrl(PRIMARY_CFG));
+
+ int seqReads = SEQ_READS_BEFORE_PREFETCH + 1;
+
+ cfg.setInt(String.format(PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, "igfs:grid@"), seqReads);
+
+ FileSystem fs = FileSystem.get(new URI(PRIMARY_URI), cfg);
+
+ // Read the first two blocks.
+ Path fsHome = new Path(PRIMARY_URI);
+ Path dir = new Path(fsHome, DIR.name());
+ Path subdir = new Path(dir, SUBDIR.name());
+ Path file = new Path(subdir, FILE.name());
+
+ FSDataInputStream fsIn = fs.open(file);
+
+ final byte[] readBuf = new byte[blockSize * 2];
+
+ fsIn.readFully(0, readBuf, 0, readBuf.length);
+
+ // Wait for a while for prefetch to finish (if any).
+ IgfsMetaManager meta = igfs.context().meta();
+
+ IgfsEntryInfo info = meta.info(meta.fileId(FILE));
+
+ IgfsBlockKey key = new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 2);
+
+ IgniteCache<IgfsBlockKey, byte[]> dataCache = igfs.context().kernalContext().cache().jcache(
+ igfs.configuration().getDataCacheName());
+
+ for (int i = 0; i < 10; i++) {
+ if (dataCache.containsKey(key))
+ break;
+ else
+ U.sleep(100);
+ }
+
+ fsIn.close();
+
+ // Remove the file from the secondary file system.
+ igfsSecondary.delete(FILE, false);
+
+ // Try reading the third block. Should fail.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ IgfsInputStream in0 = igfs.open(FILE);
+
+ in0.seek(blockSize * 2);
+
+ try {
+ in0.read(readBuf);
+ }
+ finally {
+ U.closeQuiet(in0);
+ }
+
+ return null;
+ }
+ }, IOException.class,
+ "Failed to read data due to secondary file system exception: /dir/subdir/file");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualAsyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualAsyncSelfTest.java
new file mode 100644
index 0000000..eb85700
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualAsyncSelfTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
+
+/**
+ * Tests for DUAL_ASYNC mode.
+ */
+public class HadoopIgfsDualAsyncSelfTest extends HadoopIgfsDualAbstractSelfTest {
+ /**
+ * Constructor.
+ */
+ public HadoopIgfsDualAsyncSelfTest() {
+ super(DUAL_ASYNC);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualSyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualSyncSelfTest.java
new file mode 100644
index 0000000..98a2dc5
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsDualSyncSelfTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
+
+/**
+ * Tests for DUAL_SYNC mode.
+ */
+public class HadoopIgfsDualSyncSelfTest extends HadoopIgfsDualAbstractSelfTest {
+ /**
+ * Constructor.
+ */
+ public HadoopIgfsDualSyncSelfTest() {
+ super(DUAL_SYNC);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java
new file mode 100644
index 0000000..adb1330
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemTestAdapter;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ * Universal adapter wrapping {@link org.apache.hadoop.fs.FileSystem} instance.
+ */
+public class HadoopIgfsSecondaryFileSystemTestAdapter implements IgfsSecondaryFileSystemTestAdapter {
+ /** File system factory. */
+ private final HadoopFileSystemFactoryDelegate factory;
+
+ /**
+ * Constructor.
+ * @param factory File system factory.
+ */
+ public HadoopIgfsSecondaryFileSystemTestAdapter(HadoopFileSystemFactory factory) {
+ assert factory != null;
+
+ this.factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory);
+
+ this.factory.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() throws IOException {
+ return get().getUri().toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean exists(String path) throws IOException {
+ return get().exists(new Path(path));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(String path, boolean recursive) throws IOException {
+ return get().delete(new Path(path), recursive);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mkdirs(String path) throws IOException {
+ boolean ok = get().mkdirs(new Path(path));
+ if (!ok)
+ throw new IOException("Failed to mkdirs: " + path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void format() throws IOException {
+ HadoopIgfsUtils.clear(get());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, String> properties(String path) throws IOException {
+ Path p = new Path(path);
+
+ FileStatus status = get().getFileStatus(p);
+
+ Map<String,String> m = new HashMap<>(3);
+
+ m.put(IgfsUtils.PROP_USER_NAME, status.getOwner());
+ m.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup());
+ m.put(IgfsUtils.PROP_PERMISSION, permission(status));
+
+ return m;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String permissions(String path) throws IOException {
+ return permission(get().getFileStatus(new Path(path)));
+ }
+
+ /**
+ * Get permission for file status.
+ *
+ * @param status Status.
+ * @return Permission.
+ */
+ private String permission(FileStatus status) {
+ FsPermission perm = status.getPermission();
+
+ return "0" + perm.getUserAction().ordinal() + perm.getGroupAction().ordinal() + perm.getOtherAction().ordinal();
+ }
+
+ /** {@inheritDoc} */
+ @Override public InputStream openInputStream(String path) throws IOException {
+ return get().open(new Path(path));
+ }
+
+ /** {@inheritDoc} */
+ @Override public OutputStream openOutputStream(String path, boolean append) throws IOException {
+ Path p = new Path(path);
+
+ if (append)
+ return get().append(p);
+ else
+ return get().create(p, true/*overwrite*/);
+ }
+
+ /** {@inheritDoc} */
+ @Override public T2<Long, Long> times(String path) throws IOException {
+ FileStatus status = get().getFileStatus(new Path(path));
+
+ return new T2<>(status.getAccessTime(), status.getModificationTime());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsEx igfs() {
+ return null;
+ }
+
+ /**
+ * Create file system.
+ *
+ * @return File system.
+ * @throws IOException If failed.
+ */
+ protected FileSystem get() throws IOException {
+ return (FileSystem)factory.get(FileSystemConfiguration.DFLT_USER_NAME);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopSecondaryFileSystemConfigurationTest.java
new file mode 100644
index 0000000..fd75233
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.igfs.IgfsIpcEndpointType;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils;
+import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.Callable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED;
+import static org.apache.ignite.events.EventType.EVT_TASK_FAILED;
+import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED;
+import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
+import static org.apache.ignite.igfs.IgfsMode.PROXY;
+import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT;
+
+/**
+ * Tests secondary file system configuration.
+ */
+public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstractTest {
+ /** IGFS scheme */
+ static final String IGFS_SCHEME = "igfs";
+
+ /** Primary file system authority. */
+ private static final String PRIMARY_AUTHORITY = "igfs:grid0@";
+
+ /** Autogenerated secondary file system configuration path. */
+ private static final String PRIMARY_CFG_PATH = "/work/core-site-primary-test.xml";
+
+ /** Secondary file system authority. */
+ private static final String SECONDARY_AUTHORITY = "igfs_secondary:grid_secondary@127.0.0.1:11500";
+
+ /** Autogenerated secondary file system configuration path. */
+ static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml";
+
+ /** Secondary endpoint configuration. */
+ protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG;
+
+ /** Group size. */
+ public static final int GRP_SIZE = 128;
+
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Primary file system URI. */
+ protected URI primaryFsUri;
+
+ /** Primary file system. */
+ private FileSystem primaryFs;
+
+ /** Full path of primary Fs configuration */
+ private String primaryConfFullPath;
+
+ /** Input primary Fs uri */
+ private String primaryFsUriStr;
+
+ /** Input URI scheme for configuration */
+ private String primaryCfgScheme;
+
+ /** Input URI authority for configuration */
+ private String primaryCfgAuthority;
+
+ /** if to pass configuration */
+ private boolean passPrimaryConfiguration;
+
+ /** Full path of s Fs configuration */
+ private String secondaryConfFullPath;
+
+ /** /Input URI scheme for configuration */
+ private String secondaryFsUriStr;
+
+ /** Input URI scheme for configuration */
+ private String secondaryCfgScheme;
+
+ /** Input URI authority for configuration */
+ private String secondaryCfgAuthority;
+
+ /** if to pass configuration */
+ private boolean passSecondaryConfiguration;
+
+ /** Default IGFS mode. */
+ protected final IgfsMode mode;
+
+ /** Skip embedded mode flag. */
+ private final boolean skipEmbed;
+
+ /** Skip local shmem flag. */
+ private final boolean skipLocShmem;
+
+ static {
+ SECONDARY_ENDPOINT_CFG = new IgfsIpcEndpointConfiguration();
+
+ SECONDARY_ENDPOINT_CFG.setType(IgfsIpcEndpointType.TCP);
+ SECONDARY_ENDPOINT_CFG.setPort(11500);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param mode Default IGFS mode.
+ * @param skipEmbed Whether to skip embedded mode.
+ * @param skipLocShmem Whether to skip local shmem mode.
+ */
+ protected HadoopSecondaryFileSystemConfigurationTest(IgfsMode mode, boolean skipEmbed, boolean skipLocShmem) {
+ this.mode = mode;
+ this.skipEmbed = skipEmbed;
+ this.skipLocShmem = skipLocShmem;
+ }
+
+ /**
+ * Default constructor.
+ */
+ public HadoopSecondaryFileSystemConfigurationTest() {
+ this(PROXY, true, false);
+ }
+
+ /**
+ * Executes before each test.
+ * @throws Exception If failed.
+ */
+ private void before() throws Exception {
+ initSecondary();
+
+ if (passPrimaryConfiguration) {
+ Configuration primaryFsCfg = configuration(primaryCfgScheme, primaryCfgAuthority, skipEmbed, skipLocShmem);
+
+ primaryConfFullPath = writeConfiguration(primaryFsCfg, PRIMARY_CFG_PATH);
+ }
+ else
+ primaryConfFullPath = null;
+
+ CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
+
+ fac.setConfigPaths(primaryConfFullPath);
+ fac.setUri(primaryFsUriStr);
+
+ HadoopFileSystemFactoryDelegate facDelegate = HadoopDelegateUtils.fileSystemFactoryDelegate(fac);
+
+ facDelegate.start();
+
+ primaryFs = (FileSystem)facDelegate.get(null); //provider.createFileSystem(null);
+
+ primaryFsUri = primaryFs.getUri();
+ }
+
+ /**
+ * Executes after each test.
+ * @throws Exception If failed.
+ */
+ private void after() throws Exception {
+ if (primaryFs != null) {
+ try {
+ primaryFs.delete(new Path("/"), true);
+ }
+ catch (Exception ignore) {
+ // No-op.
+ }
+
+ U.closeQuiet(primaryFs);
+ }
+
+ G.stopAll(true);
+
+ delete(primaryConfFullPath);
+ delete(secondaryConfFullPath);
+ }
+
+ /**
+ * Utility method to delete file.
+ *
+ * @param file the file path to delete.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private static void delete(String file) {
+ if (file != null) {
+ new File(file).delete();
+
+ assertFalse(new File(file).exists());
+ }
+ }
+
+ /**
+ * Initialize underlying secondary filesystem.
+ *
+ * @throws Exception If failed.
+ */
+ private void initSecondary() throws Exception {
+ if (passSecondaryConfiguration) {
+ Configuration secondaryConf = configuration(secondaryCfgScheme, secondaryCfgAuthority, true, true);
+
+ secondaryConf.setInt("fs.igfs.block.size", 1024);
+
+ secondaryConfFullPath = writeConfiguration(secondaryConf, SECONDARY_CFG_PATH);
+ }
+ else
+ secondaryConfFullPath = null;
+
+ startNodes();
+ }
+
+ /**
+ * Starts the nodes for this test.
+ *
+ * @throws Exception If failed.
+ */
+ private void startNodes() throws Exception {
+ if (mode != PRIMARY)
+ startSecondary();
+
+ startGrids(4);
+ }
+
+ /**
+ * Starts secondary IGFS
+ */
+ private void startSecondary() {
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("partitioned");
+ igfsCfg.setMetaCacheName("replicated");
+ igfsCfg.setName("igfs_secondary");
+ igfsCfg.setIpcEndpointConfiguration(SECONDARY_ENDPOINT_CFG);
+ igfsCfg.setBlockSize(512 * 1024);
+ igfsCfg.setPrefetchBlocks(1);
+
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setName("partitioned");
+ cacheCfg.setCacheMode(PARTITIONED);
+ cacheCfg.setNearConfiguration(null);
+ cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE));
+ cacheCfg.setBackups(0);
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("replicated");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName("grid_secondary");
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(metaCacheCfg, cacheCfg);
+ cfg.setFileSystemConfiguration(igfsCfg);
+ cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+
+ cfg.setCommunicationSpi(communicationSpi());
+
+ G.start(cfg);
+ }
+
+ /**
+ * Get primary IPC endpoint configuration.
+ *
+ * @param gridName Grid name.
+ * @return IPC primary endpoint configuration.
+ */
+ protected IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(final String gridName) {
+ IgfsIpcEndpointConfiguration cfg = new IgfsIpcEndpointConfiguration();
+
+ cfg.setType(IgfsIpcEndpointType.TCP);
+ cfg.setPort(DFLT_IPC_PORT + getTestGridIndex(gridName));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getTestGridName() {
+ return "grid";
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(cacheConfiguration());
+ cfg.setFileSystemConfiguration(fsConfiguration(gridName));
+ cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+ cfg.setCommunicationSpi(communicationSpi());
+
+ return cfg;
+ }
+
+ /**
+ * Gets cache configuration.
+ *
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration[] cacheConfiguration() {
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setName("partitioned");
+ cacheCfg.setCacheMode(PARTITIONED);
+ cacheCfg.setNearConfiguration(null);
+ cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE));
+ cacheCfg.setBackups(0);
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("replicated");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ return new CacheConfiguration[] {metaCacheCfg, cacheCfg};
+ }
+
+ /**
+ * Gets IGFS configuration.
+ *
+ * @param gridName Grid name.
+ * @return IGFS configuration.
+ */
+ protected FileSystemConfiguration fsConfiguration(String gridName) throws IgniteCheckedException {
+ FileSystemConfiguration cfg = new FileSystemConfiguration();
+
+ cfg.setDataCacheName("partitioned");
+ cfg.setMetaCacheName("replicated");
+ cfg.setName("igfs");
+ cfg.setPrefetchBlocks(1);
+ cfg.setDefaultMode(mode);
+
+ if (mode != PRIMARY)
+ cfg.setSecondaryFileSystem(
+ new IgniteHadoopIgfsSecondaryFileSystem(secondaryFsUriStr, secondaryConfFullPath));
+
+ cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
+
+ cfg.setManagementPort(-1);
+ cfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups.
+
+ return cfg;
+ }
+
+ /** @return Communication SPI. */
+ private CommunicationSpi communicationSpi() {
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ return commSpi;
+ }
+
+ /**
+ * Case #SecondaryFileSystemProvider(null, path)
+ *
+ * @throws Exception On failure.
+ */
+ public void testFsConfigurationOnly() throws Exception {
+ primaryCfgScheme = IGFS_SCHEME;
+ primaryCfgAuthority = PRIMARY_AUTHORITY;
+ passPrimaryConfiguration = true;
+ primaryFsUriStr = null;
+
+ // wrong secondary URI in the configuration:
+ secondaryCfgScheme = IGFS_SCHEME;
+ secondaryCfgAuthority = SECONDARY_AUTHORITY;
+ passSecondaryConfiguration = true;
+ secondaryFsUriStr = null;
+
+ check();
+ }
+
+ /**
+ * Case #SecondaryFileSystemProvider(uri, path), when 'uri' parameter overrides
+ * the Fs uri set in the configuration.
+ *
+ * @throws Exception On failure.
+ */
+ public void testFsUriOverridesUriInConfiguration() throws Exception {
+ // wrong primary URI in the configuration:
+ primaryCfgScheme = "foo";
+ primaryCfgAuthority = "moo:zoo@bee";
+ passPrimaryConfiguration = true;
+ primaryFsUriStr = mkUri(IGFS_SCHEME, PRIMARY_AUTHORITY);
+
+ // wrong secondary URI in the configuration:
+ secondaryCfgScheme = "foo";
+ secondaryCfgAuthority = "moo:zoo@bee";
+ passSecondaryConfiguration = true;
+ secondaryFsUriStr = mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY);
+
+ check();
+ }
+
+ /**
+ * Perform actual check.
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("deprecation")
+ private void check() throws Exception {
+ before();
+
+ try {
+ Path fsHome = new Path(primaryFsUri);
+ Path dir = new Path(fsHome, "/someDir1/someDir2/someDir3");
+ Path file = new Path(dir, "someFile");
+
+ assertPathDoesNotExist(primaryFs, file);
+
+ FsPermission fsPerm = new FsPermission((short)644);
+
+ FSDataOutputStream os = primaryFs.create(file, fsPerm, false, 1, (short)1, 1L, null);
+
+ // Try to write something in file.
+ os.write("abc".getBytes());
+
+ os.close();
+
+ // Check file status.
+ FileStatus fileStatus = primaryFs.getFileStatus(file);
+
+ assertFalse(fileStatus.isDir());
+ assertEquals(file, fileStatus.getPath());
+ assertEquals(fsPerm, fileStatus.getPermission());
+ }
+ finally {
+ after();
+ }
+ }
+
+ /**
+ * Create configuration for test.
+ *
+ * @param skipEmbed Whether to skip embedded mode.
+ * @param skipLocShmem Whether to skip local shmem mode.
+ * @return Configuration.
+ */
+ static Configuration configuration(String scheme, String authority, boolean skipEmbed, boolean skipLocShmem) {
+ final Configuration cfg = new Configuration();
+
+ if (scheme != null && authority != null)
+ cfg.set("fs.defaultFS", scheme + "://" + authority + "/");
+
+ setImplClasses(cfg);
+
+ if (authority != null) {
+ if (skipEmbed)
+ cfg.setBoolean(String.format(HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED, authority), true);
+
+ if (skipLocShmem)
+ cfg.setBoolean(String.format(HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority), true);
+ }
+
+ return cfg;
+ }
+
+ /**
+ * Sets Hadoop Fs implementation classes.
+ *
+ * @param cfg the configuration to set parameters into.
+ */
+ static void setImplClasses(Configuration cfg) {
+ cfg.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+ cfg.set("fs.AbstractFileSystem.igfs.impl",
+ org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem.class.getName());
+ }
+
+ /**
+ * Check path does not exist in a given FileSystem.
+ *
+ * @param fs FileSystem to check.
+ * @param path Path to check.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ private void assertPathDoesNotExist(final FileSystem fs, final Path path) {
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fs.getFileStatus(path);
+ }
+ }, FileNotFoundException.class, null);
+ }
+
+ /**
+ * Writes down the configuration to local disk and returns its path.
+ *
+ * @param cfg the configuration to write.
+ * @param pathFromIgniteHome path relatively to Ignite home.
+ * @return Full path of the written configuration.
+ */
+ static String writeConfiguration(Configuration cfg, String pathFromIgniteHome) throws IOException {
+ if (!pathFromIgniteHome.startsWith("/"))
+ pathFromIgniteHome = "/" + pathFromIgniteHome;
+
+ final String path = U.getIgniteHome() + pathFromIgniteHome;
+
+ delete(path);
+
+ File file = new File(path);
+
+ try (FileOutputStream fos = new FileOutputStream(file)) {
+ cfg.writeXml(fos);
+ }
+
+ assertTrue(file.exists());
+ return path;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 3 * 60 * 1000;
+ }
+
+ /**
+ * Makes URI.
+ *
+ * @param scheme the scheme
+ * @param authority the authority
+ * @return URI String
+ */
+ static String mkUri(String scheme, String authority) {
+ return scheme + "://" + authority + "/";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgfsEventsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgfsEventsTestSuite.java
new file mode 100644
index 0000000..6d7dc99
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgfsEventsTestSuite.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.IgfsEventsAbstractSelfTest;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.igfs.IgfsIpcEndpointType;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
+import org.apache.ignite.internal.util.typedef.G;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
+import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
+import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
+
+/**
+ * Test suite for IGFS event tests.
+ */
+@SuppressWarnings("PublicInnerClass")
+public class IgfsEventsTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ ClassLoader ldr = TestSuite.class.getClassLoader();
+
+ TestSuite suite = new TestSuite("Ignite FS Events Test Suite");
+
+ suite.addTest(new TestSuite(ldr.loadClass(ShmemPrimary.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(ShmemDualSync.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(ShmemDualAsync.class.getName())));
+
+ suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
+
+ return suite;
+ }
+
+ /**
+ * @return Test suite with only tests that are supported on all platforms.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suiteNoarchOnly() throws Exception {
+ ClassLoader ldr = TestSuite.class.getClassLoader();
+
+ TestSuite suite = new TestSuite("Ignite IGFS Events Test Suite Noarch Only");
+
+ suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
+
+ return suite;
+ }
+
+ /**
+ * Shared memory IPC in PRIVATE mode.
+ */
+ public static class ShmemPrimary extends IgfsEventsAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
+ FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
+
+ igfsCfg.setDefaultMode(IgfsMode.PRIMARY);
+
+ IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+
+ endpointCfg.setType(IgfsIpcEndpointType.SHMEM);
+ endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1);
+
+ igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+
+ return igfsCfg;
+ }
+ }
+
+ /**
+ * Loopback socket IPS in PRIVATE mode.
+ */
+ public static class LoopbackPrimary extends IgfsEventsAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
+ FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
+
+ igfsCfg.setDefaultMode(IgfsMode.PRIMARY);
+
+ IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+
+ endpointCfg.setType(IgfsIpcEndpointType.TCP);
+ endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1);
+
+ igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+
+ return igfsCfg;
+ }
+ }
+
+ /**
+ * Base class for all IGFS tests with primary and secondary file system.
+ */
+ public abstract static class PrimarySecondaryTest extends IgfsEventsAbstractSelfTest {
+ /** Secondary file system. */
+ private static IgniteFileSystem igfsSec;
+
+ /** {@inheritDoc} */
+ @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
+ FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
+
+ igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
+ "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/",
+ "modules/core/src/test/config/hadoop/core-site-secondary.xml"));
+
+ return igfsCfg;
+ }
+
+ /**
+ * @return IGFS configuration for secondary file system.
+ */
+ protected FileSystemConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException {
+ FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
+
+ igfsCfg.setName("igfs-secondary");
+ igfsCfg.setDefaultMode(PRIMARY);
+
+ IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+
+ endpointCfg.setType(IgfsIpcEndpointType.TCP);
+ endpointCfg.setPort(11500);
+
+ igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+
+ return igfsCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ igfsSec = startSecondary();
+
+ super.beforeTestsStarted();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ G.stopAll(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ // Clean up secondary file system.
+ igfsSec.format();
+ }
+
+ /**
+ * Start a grid with the secondary file system.
+ *
+ * @return Secondary file system handle.
+ * @throws Exception If failed.
+ */
+ @Nullable private IgniteFileSystem startSecondary() throws Exception {
+ IgniteConfiguration cfg = getConfiguration("grid-secondary", getSecondaryIgfsConfiguration());
+
+ cfg.setLocalHost("127.0.0.1");
+ cfg.setPeerClassLoadingEnabled(false);
+
+ Ignite secG = G.start(cfg);
+
+ return secG.fileSystem("igfs-secondary");
+ }
+ }
+
+ /**
+ * Shared memory IPC in DUAL_SYNC mode.
+ */
+ public static class ShmemDualSync extends PrimarySecondaryTest {
+ /** {@inheritDoc} */
+ @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
+ FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
+
+ igfsCfg.setDefaultMode(DUAL_SYNC);
+
+ return igfsCfg;
+ }
+ }
+
+ /**
+ * Shared memory IPC in DUAL_SYNC mode.
+ */
+ public static class ShmemDualAsync extends PrimarySecondaryTest {
+ /** {@inheritDoc} */
+ @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
+ FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
+
+ igfsCfg.setDefaultMode(DUAL_ASYNC);
+
+ return igfsCfg;
+ }
+ }
+
+ /**
+ * Loopback socket IPC with secondary file system.
+ */
+ public abstract static class LoopbackPrimarySecondaryTest extends PrimarySecondaryTest {
+ /** {@inheritDoc} */
+ @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
+ FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
+
+ igfsCfg.setDefaultMode(IgfsMode.PRIMARY);
+
+ igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
+ "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/",
+ "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"));
+
+ return igfsCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FileSystemConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException {
+ FileSystemConfiguration igfsCfg = super.getSecondaryIgfsConfiguration();
+
+ igfsCfg.setName("igfs-secondary");
+ igfsCfg.setDefaultMode(PRIMARY);
+
+ IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+
+ endpointCfg.setType(IgfsIpcEndpointType.TCP);
+ endpointCfg.setPort(11500);
+
+ igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+
+ return igfsCfg;
+ }
+ }
+
+ /**
+ * Loopback IPC in DUAL_SYNC mode.
+ */
+ public static class LoopbackDualSync extends LoopbackPrimarySecondaryTest {
+ /** {@inheritDoc} */
+ @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
+ FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
+
+ igfsCfg.setDefaultMode(DUAL_SYNC);
+
+ return igfsCfg;
+ }
+ }
+
+ /**
+ * Loopback socket IPC in DUAL_ASYNC mode.
+ */
+ public static class LoopbackDualAsync extends LoopbackPrimarySecondaryTest {
+ /** {@inheritDoc} */
+ @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
+ FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
+
+ igfsCfg.setDefaultMode(DUAL_ASYNC);
+
+ return igfsCfg;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgfsNearOnlyMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgfsNearOnlyMultiNodeSelfTest.java
new file mode 100644
index 0000000..51048c6
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgfsNearOnlyMultiNodeSelfTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.igfs.IgfsIpcEndpointType;
+import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED;
+import static org.apache.ignite.events.EventType.EVT_TASK_FAILED;
+import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED;
+
+/**
+ * Test hadoop file system implementation.
+ */
+public class IgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest {
+ /** Path to the default hadoop configuration. */
+ public static final String HADOOP_FS_CFG = "examples/config/filesystem/core-site.xml";
+
+ /** Group size. */
+ public static final int GRP_SIZE = 128;
+
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Node count. */
+ private int cnt;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(nodeCount());
+
+ grid(0).createNearCache("data", new NearCacheConfiguration());
+
+ grid(0).createNearCache("meta", new NearCacheConfiguration());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ G.stopAll(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER).setForceServerMode(true));
+
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("data");
+ igfsCfg.setMetaCacheName("meta");
+ igfsCfg.setName("igfs");
+
+ IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+
+ endpointCfg.setType(IgfsIpcEndpointType.SHMEM);
+ endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt);
+
+ igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+
+ igfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups.
+
+ cfg.setFileSystemConfiguration(igfsCfg);
+
+ cfg.setCacheConfiguration(cacheConfiguration(gridName, "data"), cacheConfiguration(gridName, "meta"));
+
+ cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+
+ if (cnt == 0)
+ cfg.setClientMode(true);
+
+ cnt++;
+
+ return cfg;
+ }
+
+ /** @return Node count for test. */
+ protected int nodeCount() {
+ return 4;
+ }
+
+ /**
+ * Gets cache configuration.
+ *
+ * @param gridName Grid name.
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration cacheConfiguration(String gridName, String cacheName) {
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setName(cacheName);
+ cacheCfg.setCacheMode(PARTITIONED);
+ cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE));
+ cacheCfg.setBackups(0);
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ return cacheCfg;
+ }
+
+ /**
+ * Gets config of concrete File System.
+ *
+ * @return Config of concrete File System.
+ */
+ protected Configuration getFileSystemConfig() {
+ Configuration cfg = new Configuration();
+
+ cfg.addResource(U.resolveIgniteUrl(HADOOP_FS_CFG));
+
+ return cfg;
+ }
+
+ /**
+ * Gets File System name.
+ *
+ * @param grid Grid index.
+ * @return File System name.
+ */
+ protected URI getFileSystemURI(int grid) {
+ try {
+ return new URI("igfs://127.0.0.1:" + (IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + grid));
+ }
+ catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** @throws Exception If failed. */
+ public void testContentsConsistency() throws Exception {
+ try (FileSystem fs = FileSystem.get(getFileSystemURI(0), getFileSystemConfig())) {
+ Collection<IgniteBiTuple<String, Long>> files = F.asList(
+ F.t("/dir1/dir2/file1", 1024L),
+ F.t("/dir1/dir2/file2", 8 * 1024L),
+ F.t("/dir1/file1", 1024 * 1024L),
+ F.t("/dir1/file2", 5 * 1024 * 1024L),
+ F.t("/file1", 64 * 1024L + 13),
+ F.t("/file2", 13L),
+ F.t("/file3", 123764L)
+ );
+
+ for (IgniteBiTuple<String, Long> file : files) {
+
+ info("Writing file: " + file.get1());
+
+ try (OutputStream os = fs.create(new Path(file.get1()), (short)3)) {
+ byte[] data = new byte[file.get2().intValue()];
+
+ data[0] = 25;
+ data[data.length - 1] = 26;
+
+ os.write(data);
+ }
+
+ info("Finished writing file: " + file.get1());
+ }
+
+ for (int i = 1; i < nodeCount(); i++) {
+
+ try (FileSystem ignored = FileSystem.get(getFileSystemURI(i), getFileSystemConfig())) {
+ for (IgniteBiTuple<String, Long> file : files) {
+ Path path = new Path(file.get1());
+
+ FileStatus fileStatus = fs.getFileStatus(path);
+
+ assertEquals(file.get2(), (Long)fileStatus.getLen());
+
+ byte[] read = new byte[file.get2().intValue()];
+
+ info("Reading file: " + path);
+
+ try (FSDataInputStream in = fs.open(path)) {
+ in.readFully(read);
+
+ assert read[0] == 25;
+ assert read[read.length - 1] == 26;
+ }
+
+ info("Finished reading file: " + path);
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file