You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/16 11:20:45 UTC
[02/51] [partial] ignite git commit: IGNITE-3916: Initial impl.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
deleted file mode 100644
index bb155b4..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
-import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
-import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
-import org.apache.ignite.internal.processors.igfs.IgfsImpl;
-import org.apache.ignite.internal.processors.igfs.IgfsMetaManager;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.Callable;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
-import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
-import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
-import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.awaitFileClose;
-import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.clear;
-import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.create;
-
-/**
- * Tests for IGFS working in mode when remote file system exists: DUAL_SYNC, DUAL_ASYNC.
- */
-public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractTest {
- /** IGFS block size. */
- protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
-
- /** Amount of blocks to prefetch. */
- protected static final int PREFETCH_BLOCKS = 1;
-
- /** Amount of sequential block reads before prefetch is triggered. */
- protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
-
- /** Secondary file system URI. */
- protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
-
- /** Secondary file system configuration path. */
- protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
-
- /** Primary file system URI. */
- protected static final String PRIMARY_URI = "igfs://igfs:grid@/";
-
- /** Primary file system configuration path. */
- protected static final String PRIMARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback.xml";
-
- /** Primary file system REST endpoint configuration map. */
- protected static final IgfsIpcEndpointConfiguration PRIMARY_REST_CFG;
-
- /** Secondary file system REST endpoint configuration map. */
- protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG;
-
- /** Directory. */
- protected static final IgfsPath DIR = new IgfsPath("/dir");
-
- /** Sub-directory. */
- protected static final IgfsPath SUBDIR = new IgfsPath(DIR, "subdir");
-
- /** File. */
- protected static final IgfsPath FILE = new IgfsPath(SUBDIR, "file");
-
- /** Default data chunk (128 bytes). */
- protected static byte[] chunk;
-
- /** Primary IGFS. */
- protected static IgfsImpl igfs;
-
- /** Secondary IGFS. */
- protected static IgfsImpl igfsSecondary;
-
- /** IGFS mode. */
- protected final IgfsMode mode;
-
- static {
- PRIMARY_REST_CFG = new IgfsIpcEndpointConfiguration();
-
- PRIMARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
- PRIMARY_REST_CFG.setPort(10500);
-
- SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration();
-
- SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
- SECONDARY_REST_CFG.setPort(11500);
- }
-
- /**
- * Constructor.
- *
- * @param mode IGFS mode.
- */
- protected HadoopIgfsDualAbstractSelfTest(IgfsMode mode) {
- this.mode = mode;
- assert mode == DUAL_SYNC || mode == DUAL_ASYNC;
- }
-
- /**
- * Start grid with IGFS.
- *
- * @param gridName Grid name.
- * @param igfsName IGFS name
- * @param mode IGFS mode.
- * @param secondaryFs Secondary file system (optional).
- * @param restCfg Rest configuration string (optional).
- * @return Started grid instance.
- * @throws Exception If failed.
- */
- protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
- @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
- FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
-
- igfsCfg.setDataCacheName("dataCache");
- igfsCfg.setMetaCacheName("metaCache");
- igfsCfg.setName(igfsName);
- igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
- igfsCfg.setDefaultMode(mode);
- igfsCfg.setIpcEndpointConfiguration(restCfg);
- igfsCfg.setSecondaryFileSystem(secondaryFs);
- igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
- igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
-
- CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
-
- dataCacheCfg.setName("dataCache");
- dataCacheCfg.setCacheMode(PARTITIONED);
- dataCacheCfg.setNearConfiguration(null);
- dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
- dataCacheCfg.setBackups(0);
- dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
- dataCacheCfg.setOffHeapMaxMemory(0);
-
- CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-
- metaCacheCfg.setName("metaCache");
- metaCacheCfg.setCacheMode(REPLICATED);
- metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- IgniteConfiguration cfg = new IgniteConfiguration();
-
- cfg.setGridName(gridName);
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-
- cfg.setDiscoverySpi(discoSpi);
- cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
- cfg.setFileSystemConfiguration(igfsCfg);
-
- cfg.setLocalHost("127.0.0.1");
- cfg.setConnectorConfiguration(null);
-
- return G.start(cfg);
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- chunk = new byte[128];
-
- for (int i = 0; i < chunk.length; i++)
- chunk[i] = (byte)i;
-
- Ignite igniteSecondary = startGridWithIgfs("grid-secondary", "igfs-secondary", PRIMARY, null, SECONDARY_REST_CFG);
-
- IgfsSecondaryFileSystem hadoopFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
-
- Ignite ignite = startGridWithIgfs("grid", "igfs", mode, hadoopFs, PRIMARY_REST_CFG);
-
- igfsSecondary = (IgfsImpl) igniteSecondary.fileSystem("igfs-secondary");
- igfs = (IgfsImpl) ignite.fileSystem("igfs");
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- clear(igfs);
- clear(igfsSecondary);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- G.stopAll(true);
- }
-
- /**
- * Convenient method to group paths.
- *
- * @param paths Paths to group.
- * @return Paths as array.
- */
- protected IgfsPath[] paths(IgfsPath... paths) {
- return paths;
- }
-
- /**
- * Check how prefetch override works.
- *
- * @throws Exception IF failed.
- */
- public void testOpenPrefetchOverride() throws Exception {
- create(igfsSecondary, paths(DIR, SUBDIR), paths(FILE));
-
- // Write enough data to the secondary file system.
- final int blockSize = IGFS_BLOCK_SIZE;
-
- IgfsOutputStream out = igfsSecondary.append(FILE, false);
-
- int totalWritten = 0;
-
- while (totalWritten < blockSize * 2 + chunk.length) {
- out.write(chunk);
-
- totalWritten += chunk.length;
- }
-
- out.close();
-
- awaitFileClose(igfsSecondary, FILE);
-
- // Instantiate file system with overridden "seq reads before prefetch" property.
- Configuration cfg = new Configuration();
-
- cfg.addResource(U.resolveIgniteUrl(PRIMARY_CFG));
-
- int seqReads = SEQ_READS_BEFORE_PREFETCH + 1;
-
- cfg.setInt(String.format(PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, "igfs:grid@"), seqReads);
-
- FileSystem fs = FileSystem.get(new URI(PRIMARY_URI), cfg);
-
- // Read the first two blocks.
- Path fsHome = new Path(PRIMARY_URI);
- Path dir = new Path(fsHome, DIR.name());
- Path subdir = new Path(dir, SUBDIR.name());
- Path file = new Path(subdir, FILE.name());
-
- FSDataInputStream fsIn = fs.open(file);
-
- final byte[] readBuf = new byte[blockSize * 2];
-
- fsIn.readFully(0, readBuf, 0, readBuf.length);
-
- // Wait for a while for prefetch to finish (if any).
- IgfsMetaManager meta = igfs.context().meta();
-
- IgfsEntryInfo info = meta.info(meta.fileId(FILE));
-
- IgfsBlockKey key = new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 2);
-
- IgniteCache<IgfsBlockKey, byte[]> dataCache = igfs.context().kernalContext().cache().jcache(
- igfs.configuration().getDataCacheName());
-
- for (int i = 0; i < 10; i++) {
- if (dataCache.containsKey(key))
- break;
- else
- U.sleep(100);
- }
-
- fsIn.close();
-
- // Remove the file from the secondary file system.
- igfsSecondary.delete(FILE, false);
-
- // Try reading the third block. Should fail.
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- IgfsInputStream in0 = igfs.open(FILE);
-
- in0.seek(blockSize * 2);
-
- try {
- in0.read(readBuf);
- }
- finally {
- U.closeQuiet(in0);
- }
-
- return null;
- }
- }, IOException.class,
- "Failed to read data due to secondary file system exception: /dir/subdir/file");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java
deleted file mode 100644
index 6c6e709..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs;
-
-import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
-
-/**
- * Tests for DUAL_ASYNC mode.
- */
-public class HadoopIgfsDualAsyncSelfTest extends HadoopIgfsDualAbstractSelfTest {
- /**
- * Constructor.
- */
- public HadoopIgfsDualAsyncSelfTest() {
- super(DUAL_ASYNC);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java
deleted file mode 100644
index 96a63d5..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs;
-
-import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
-
-/**
- * Tests for DUAL_SYNC mode.
- */
-public class HadoopIgfsDualSyncSelfTest extends HadoopIgfsDualAbstractSelfTest {
- /**
- * Constructor.
- */
- public HadoopIgfsDualSyncSelfTest() {
- super(DUAL_SYNC);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java
deleted file mode 100644
index f7af6f0..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemTestAdapter;
-import org.apache.ignite.internal.util.typedef.T2;
-
-/**
- * Universal adapter wrapping {@link org.apache.hadoop.fs.FileSystem} instance.
- */
-public class HadoopIgfsSecondaryFileSystemTestAdapter implements IgfsSecondaryFileSystemTestAdapter {
- /** File system factory. */
- private final HadoopFileSystemFactory factory;
-
- /**
- * Constructor.
- * @param factory File system factory.
- */
- public HadoopIgfsSecondaryFileSystemTestAdapter(HadoopFileSystemFactory factory) {
- assert factory != null;
-
- this.factory = factory;
- }
-
- /** {@inheritDoc} */
- @Override public String name() throws IOException {
- return get().getUri().toString();
- }
-
- /** {@inheritDoc} */
- @Override public boolean exists(String path) throws IOException {
- return get().exists(new Path(path));
- }
-
- /** {@inheritDoc} */
- @Override public boolean delete(String path, boolean recursive) throws IOException {
- return get().delete(new Path(path), recursive);
- }
-
- /** {@inheritDoc} */
- @Override public void mkdirs(String path) throws IOException {
- boolean ok = get().mkdirs(new Path(path));
- if (!ok)
- throw new IOException("Failed to mkdirs: " + path);
- }
-
- /** {@inheritDoc} */
- @Override public void format() throws IOException {
- HadoopIgfsUtils.clear(get());
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, String> properties(String path) throws IOException {
- Path p = new Path(path);
-
- FileStatus status = get().getFileStatus(p);
-
- Map<String,String> m = new HashMap<>(3);
-
- m.put(IgfsUtils.PROP_USER_NAME, status.getOwner());
- m.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup());
- m.put(IgfsUtils.PROP_PERMISSION, permission(status));
-
- return m;
- }
-
- /** {@inheritDoc} */
- @Override public String permissions(String path) throws IOException {
- return permission(get().getFileStatus(new Path(path)));
- }
-
- /**
- * Get permission for file status.
- *
- * @param status Status.
- * @return Permission.
- */
- private String permission(FileStatus status) {
- FsPermission perm = status.getPermission();
-
- return "0" + perm.getUserAction().ordinal() + perm.getGroupAction().ordinal() + perm.getOtherAction().ordinal();
- }
-
- /** {@inheritDoc} */
- @Override public InputStream openInputStream(String path) throws IOException {
- return get().open(new Path(path));
- }
-
- /** {@inheritDoc} */
- @Override public OutputStream openOutputStream(String path, boolean append) throws IOException {
- Path p = new Path(path);
-
- if (append)
- return get().append(p);
- else
- return get().create(p, true/*overwrite*/);
- }
-
- /** {@inheritDoc} */
- @Override public T2<Long, Long> times(String path) throws IOException {
- FileStatus status = get().getFileStatus(new Path(path));
-
- return new T2<>(status.getAccessTime(), status.getModificationTime());
- }
-
- /** {@inheritDoc} */
- @Override public IgfsEx igfs() {
- return null;
- }
-
- /**
- * Create file system.
- *
- * @return File system.
- * @throws IOException If failed.
- */
- protected FileSystem get() throws IOException {
- return factory.get(FileSystemConfiguration.DFLT_USER_NAME);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
deleted file mode 100644
index d9b5d66..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.communication.CommunicationSpi;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.Callable;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED;
-import static org.apache.ignite.events.EventType.EVT_TASK_FAILED;
-import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED;
-import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
-import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT;
-
-/**
- * Tests secondary file system configuration.
- */
-public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstractTest {
- /** IGFS scheme */
- static final String IGFS_SCHEME = "igfs";
-
- /** Primary file system authority. */
- private static final String PRIMARY_AUTHORITY = "igfs:grid0@";
-
- /** Autogenerated secondary file system configuration path. */
- private static final String PRIMARY_CFG_PATH = "/work/core-site-primary-test.xml";
-
- /** Secondary file system authority. */
- private static final String SECONDARY_AUTHORITY = "igfs_secondary:grid_secondary@127.0.0.1:11500";
-
- /** Autogenerated secondary file system configuration path. */
- static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml";
-
- /** Secondary endpoint configuration. */
- protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG;
-
- /** Group size. */
- public static final int GRP_SIZE = 128;
-
- /** IP finder. */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** Primary file system URI. */
- protected URI primaryFsUri;
-
- /** Primary file system. */
- private FileSystem primaryFs;
-
- /** Full path of primary Fs configuration */
- private String primaryConfFullPath;
-
- /** Input primary Fs uri */
- private String primaryFsUriStr;
-
- /** Input URI scheme for configuration */
- private String primaryCfgScheme;
-
- /** Input URI authority for configuration */
- private String primaryCfgAuthority;
-
- /** if to pass configuration */
- private boolean passPrimaryConfiguration;
-
- /** Full path of s Fs configuration */
- private String secondaryConfFullPath;
-
- /** /Input URI scheme for configuration */
- private String secondaryFsUriStr;
-
- /** Input URI scheme for configuration */
- private String secondaryCfgScheme;
-
- /** Input URI authority for configuration */
- private String secondaryCfgAuthority;
-
- /** if to pass configuration */
- private boolean passSecondaryConfiguration;
-
- /** Default IGFS mode. */
- protected final IgfsMode mode;
-
- /** Skip embedded mode flag. */
- private final boolean skipEmbed;
-
- /** Skip local shmem flag. */
- private final boolean skipLocShmem;
-
- static {
- SECONDARY_ENDPOINT_CFG = new IgfsIpcEndpointConfiguration();
-
- SECONDARY_ENDPOINT_CFG.setType(IgfsIpcEndpointType.TCP);
- SECONDARY_ENDPOINT_CFG.setPort(11500);
- }
-
- /**
- * Constructor.
- *
- * @param mode Default IGFS mode.
- * @param skipEmbed Whether to skip embedded mode.
- * @param skipLocShmem Whether to skip local shmem mode.
- */
- protected HadoopSecondaryFileSystemConfigurationTest(IgfsMode mode, boolean skipEmbed, boolean skipLocShmem) {
- this.mode = mode;
- this.skipEmbed = skipEmbed;
- this.skipLocShmem = skipLocShmem;
- }
-
- /**
- * Default constructor.
- */
- public HadoopSecondaryFileSystemConfigurationTest() {
- this(PROXY, true, false);
- }
-
- /**
- * Executes before each test.
- * @throws Exception
- */
- private void before() throws Exception {
- initSecondary();
-
- if (passPrimaryConfiguration) {
- Configuration primaryFsCfg = configuration(primaryCfgScheme, primaryCfgAuthority, skipEmbed, skipLocShmem);
-
- primaryConfFullPath = writeConfiguration(primaryFsCfg, PRIMARY_CFG_PATH);
- }
- else
- primaryConfFullPath = null;
-
- CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
-
- fac.setConfigPaths(primaryConfFullPath);
- fac.setUri(primaryFsUriStr);
-
- fac.start();
-
- primaryFs = fac.get(null); //provider.createFileSystem(null);
-
- primaryFsUri = primaryFs.getUri();
- }
-
- /**
- * Executes after each test.
- * @throws Exception
- */
- private void after() throws Exception {
- if (primaryFs != null) {
- try {
- primaryFs.delete(new Path("/"), true);
- }
- catch (Exception ignore) {
- // No-op.
- }
-
- U.closeQuiet(primaryFs);
- }
-
- G.stopAll(true);
-
- delete(primaryConfFullPath);
- delete(secondaryConfFullPath);
- }
-
- /**
- * Utility method to delete file.
- *
- * @param file the file path to delete.
- */
- @SuppressWarnings("ResultOfMethodCallIgnored")
- private static void delete(String file) {
- if (file != null) {
- new File(file).delete();
-
- assertFalse(new File(file).exists());
- }
- }
-
- /**
- * Initialize underlying secondary filesystem.
- *
- * @throws Exception
- */
- private void initSecondary() throws Exception {
- if (passSecondaryConfiguration) {
- Configuration secondaryConf = configuration(secondaryCfgScheme, secondaryCfgAuthority, true, true);
-
- secondaryConf.setInt("fs.igfs.block.size", 1024);
-
- secondaryConfFullPath = writeConfiguration(secondaryConf, SECONDARY_CFG_PATH);
- }
- else
- secondaryConfFullPath = null;
-
- startNodes();
- }
-
- /**
- * Starts the nodes for this test.
- *
- * @throws Exception If failed.
- */
- private void startNodes() throws Exception {
- if (mode != PRIMARY)
- startSecondary();
-
- startGrids(4);
- }
-
- /**
- * Starts secondary IGFS
- */
- private void startSecondary() {
- FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
-
- igfsCfg.setDataCacheName("partitioned");
- igfsCfg.setMetaCacheName("replicated");
- igfsCfg.setName("igfs_secondary");
- igfsCfg.setIpcEndpointConfiguration(SECONDARY_ENDPOINT_CFG);
- igfsCfg.setBlockSize(512 * 1024);
- igfsCfg.setPrefetchBlocks(1);
-
- CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
- cacheCfg.setName("partitioned");
- cacheCfg.setCacheMode(PARTITIONED);
- cacheCfg.setNearConfiguration(null);
- cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE));
- cacheCfg.setBackups(0);
- cacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-
- metaCacheCfg.setName("replicated");
- metaCacheCfg.setCacheMode(REPLICATED);
- metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- IgniteConfiguration cfg = new IgniteConfiguration();
-
- cfg.setGridName("grid_secondary");
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-
- cfg.setDiscoverySpi(discoSpi);
- cfg.setCacheConfiguration(metaCacheCfg, cacheCfg);
- cfg.setFileSystemConfiguration(igfsCfg);
- cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
-
- cfg.setCommunicationSpi(communicationSpi());
-
- G.start(cfg);
- }
-
- /**
- * Get primary IPC endpoint configuration.
- *
- * @param gridName Grid name.
- * @return IPC primary endpoint configuration.
- */
- protected IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(final String gridName) {
- IgfsIpcEndpointConfiguration cfg = new IgfsIpcEndpointConfiguration();
-
- cfg.setType(IgfsIpcEndpointType.TCP);
- cfg.setPort(DFLT_IPC_PORT + getTestGridIndex(gridName));
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override public String getTestGridName() {
- return "grid";
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(discoSpi);
- cfg.setCacheConfiguration(cacheConfiguration());
- cfg.setFileSystemConfiguration(fsConfiguration(gridName));
- cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
- cfg.setCommunicationSpi(communicationSpi());
-
- return cfg;
- }
-
- /**
- * Gets cache configuration.
- *
- * @return Cache configuration.
- */
- protected CacheConfiguration[] cacheConfiguration() {
- CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
- cacheCfg.setName("partitioned");
- cacheCfg.setCacheMode(PARTITIONED);
- cacheCfg.setNearConfiguration(null);
- cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE));
- cacheCfg.setBackups(0);
- cacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-
- metaCacheCfg.setName("replicated");
- metaCacheCfg.setCacheMode(REPLICATED);
- metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- return new CacheConfiguration[] {metaCacheCfg, cacheCfg};
- }
-
- /**
- * Gets IGFS configuration.
- *
- * @param gridName Grid name.
- * @return IGFS configuration.
- */
- protected FileSystemConfiguration fsConfiguration(String gridName) throws IgniteCheckedException {
- FileSystemConfiguration cfg = new FileSystemConfiguration();
-
- cfg.setDataCacheName("partitioned");
- cfg.setMetaCacheName("replicated");
- cfg.setName("igfs");
- cfg.setPrefetchBlocks(1);
- cfg.setDefaultMode(mode);
-
- if (mode != PRIMARY)
- cfg.setSecondaryFileSystem(
- new IgniteHadoopIgfsSecondaryFileSystem(secondaryFsUriStr, secondaryConfFullPath));
-
- cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
-
- cfg.setManagementPort(-1);
- cfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups.
-
- return cfg;
- }
-
- /** @return Communication SPI. */
- private CommunicationSpi communicationSpi() {
- TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
-
- commSpi.setSharedMemoryPort(-1);
-
- return commSpi;
- }
-
- /**
- * Case #SecondaryFileSystemProvider(null, path)
- *
- * @throws Exception On failure.
- */
- public void testFsConfigurationOnly() throws Exception {
- primaryCfgScheme = IGFS_SCHEME;
- primaryCfgAuthority = PRIMARY_AUTHORITY;
- passPrimaryConfiguration = true;
- primaryFsUriStr = null;
-
- // wrong secondary URI in the configuration:
- secondaryCfgScheme = IGFS_SCHEME;
- secondaryCfgAuthority = SECONDARY_AUTHORITY;
- passSecondaryConfiguration = true;
- secondaryFsUriStr = null;
-
- check();
- }
-
- /**
- * Case #SecondaryFileSystemProvider(uri, path), when 'uri' parameter overrides
- * the Fs uri set in the configuration.
- *
- * @throws Exception On failure.
- */
- public void testFsUriOverridesUriInConfiguration() throws Exception {
- // wrong primary URI in the configuration:
- primaryCfgScheme = "foo";
- primaryCfgAuthority = "moo:zoo@bee";
- passPrimaryConfiguration = true;
- primaryFsUriStr = mkUri(IGFS_SCHEME, PRIMARY_AUTHORITY);
-
- // wrong secondary URI in the configuration:
- secondaryCfgScheme = "foo";
- secondaryCfgAuthority = "moo:zoo@bee";
- passSecondaryConfiguration = true;
- secondaryFsUriStr = mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY);
-
- check();
- }
-
- /**
- * Perform actual check.
- *
- * @throws Exception If failed.
- */
- @SuppressWarnings("deprecation")
- private void check() throws Exception {
- before();
-
- try {
- Path fsHome = new Path(primaryFsUri);
- Path dir = new Path(fsHome, "/someDir1/someDir2/someDir3");
- Path file = new Path(dir, "someFile");
-
- assertPathDoesNotExist(primaryFs, file);
-
- FsPermission fsPerm = new FsPermission((short)644);
-
- FSDataOutputStream os = primaryFs.create(file, fsPerm, false, 1, (short)1, 1L, null);
-
- // Try to write something in file.
- os.write("abc".getBytes());
-
- os.close();
-
- // Check file status.
- FileStatus fileStatus = primaryFs.getFileStatus(file);
-
- assertFalse(fileStatus.isDir());
- assertEquals(file, fileStatus.getPath());
- assertEquals(fsPerm, fileStatus.getPermission());
- }
- finally {
- after();
- }
- }
-
- /**
- * Create configuration for test.
- *
- * @param skipEmbed Whether to skip embedded mode.
- * @param skipLocShmem Whether to skip local shmem mode.
- * @return Configuration.
- */
- static Configuration configuration(String scheme, String authority, boolean skipEmbed, boolean skipLocShmem) {
- final Configuration cfg = new Configuration();
-
- if (scheme != null && authority != null)
- cfg.set("fs.defaultFS", scheme + "://" + authority + "/");
-
- setImplClasses(cfg);
-
- if (authority != null) {
- if (skipEmbed)
- cfg.setBoolean(String.format(HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED, authority), true);
-
- if (skipLocShmem)
- cfg.setBoolean(String.format(HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority), true);
- }
-
- return cfg;
- }
-
- /**
- * Sets Hadoop Fs implementation classes.
- *
- * @param cfg the configuration to set parameters into.
- */
- static void setImplClasses(Configuration cfg) {
- cfg.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
-
- cfg.set("fs.AbstractFileSystem.igfs.impl",
- org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem.class.getName());
- }
-
- /**
- * Check path does not exist in a given FileSystem.
- *
- * @param fs FileSystem to check.
- * @param path Path to check.
- */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- private void assertPathDoesNotExist(final FileSystem fs, final Path path) {
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- return fs.getFileStatus(path);
- }
- }, FileNotFoundException.class, null);
- }
-
- /**
- * Writes down the configuration to local disk and returns its path.
- *
- * @param cfg the configuration to write.
- * @param pathFromIgniteHome path relatively to Ignite home.
- * @return Full path of the written configuration.
- */
- static String writeConfiguration(Configuration cfg, String pathFromIgniteHome) throws IOException {
- if (!pathFromIgniteHome.startsWith("/"))
- pathFromIgniteHome = "/" + pathFromIgniteHome;
-
- final String path = U.getIgniteHome() + pathFromIgniteHome;
-
- delete(path);
-
- File file = new File(path);
-
- try (FileOutputStream fos = new FileOutputStream(file)) {
- cfg.writeXml(fos);
- }
-
- assertTrue(file.exists());
- return path;
- }
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 3 * 60 * 1000;
- }
-
- /**
- * Makes URI.
- *
- * @param scheme the scheme
- * @param authority the authority
- * @return URI String
- */
- static String mkUri(String scheme, String authority) {
- return scheme + "://" + authority + "/";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
deleted file mode 100644
index a9d7bad..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs;
-
-import junit.framework.TestSuite;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
-import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
-import org.apache.ignite.internal.util.typedef.G;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
-import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
-import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
-
-/**
- * Test suite for IGFS event tests.
- */
-@SuppressWarnings("PublicInnerClass")
-public class IgfsEventsTestSuite extends TestSuite {
- /**
- * @return Test suite.
- * @throws Exception Thrown in case of the failure.
- */
- public static TestSuite suite() throws Exception {
- ClassLoader ldr = TestSuite.class.getClassLoader();
-
- TestSuite suite = new TestSuite("Ignite FS Events Test Suite");
-
- suite.addTest(new TestSuite(ldr.loadClass(ShmemPrimary.class.getName())));
- suite.addTest(new TestSuite(ldr.loadClass(ShmemDualSync.class.getName())));
- suite.addTest(new TestSuite(ldr.loadClass(ShmemDualAsync.class.getName())));
-
- suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName())));
- suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
- suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
-
- return suite;
- }
-
- /**
- * @return Test suite with only tests that are supported on all platforms.
- * @throws Exception Thrown in case of the failure.
- */
- public static TestSuite suiteNoarchOnly() throws Exception {
- ClassLoader ldr = TestSuite.class.getClassLoader();
-
- TestSuite suite = new TestSuite("Ignite IGFS Events Test Suite Noarch Only");
-
- suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName())));
- suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
- suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
-
- return suite;
- }
-
- /**
- * Shared memory IPC in PRIVATE mode.
- */
- public static class ShmemPrimary extends IgfsEventsAbstractSelfTest {
- /** {@inheritDoc} */
- @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
- FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
-
- igfsCfg.setDefaultMode(IgfsMode.PRIMARY);
-
- IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-
- endpointCfg.setType(IgfsIpcEndpointType.SHMEM);
- endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1);
-
- igfsCfg.setIpcEndpointConfiguration(endpointCfg);
-
- return igfsCfg;
- }
- }
-
- /**
- * Loopback socket IPS in PRIVATE mode.
- */
- public static class LoopbackPrimary extends IgfsEventsAbstractSelfTest {
- /** {@inheritDoc} */
- @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
- FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
-
- igfsCfg.setDefaultMode(IgfsMode.PRIMARY);
-
- IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-
- endpointCfg.setType(IgfsIpcEndpointType.TCP);
- endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1);
-
- igfsCfg.setIpcEndpointConfiguration(endpointCfg);
-
- return igfsCfg;
- }
- }
-
- /**
- * Base class for all IGFS tests with primary and secondary file system.
- */
- public abstract static class PrimarySecondaryTest extends IgfsEventsAbstractSelfTest {
- /** Secondary file system. */
- private static IgniteFileSystem igfsSec;
-
- /** {@inheritDoc} */
- @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
- FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
-
- igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
- "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/",
- "modules/core/src/test/config/hadoop/core-site-secondary.xml"));
-
- return igfsCfg;
- }
-
- /**
- * @return IGFS configuration for secondary file system.
- */
- protected FileSystemConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException {
- FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
-
- igfsCfg.setName("igfs-secondary");
- igfsCfg.setDefaultMode(PRIMARY);
-
- IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-
- endpointCfg.setType(IgfsIpcEndpointType.TCP);
- endpointCfg.setPort(11500);
-
- igfsCfg.setIpcEndpointConfiguration(endpointCfg);
-
- return igfsCfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- igfsSec = startSecondary();
-
- super.beforeTestsStarted();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
-
- G.stopAll(true);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- // Clean up secondary file system.
- igfsSec.format();
- }
-
- /**
- * Start a grid with the secondary file system.
- *
- * @return Secondary file system handle.
- * @throws Exception If failed.
- */
- @Nullable private IgniteFileSystem startSecondary() throws Exception {
- IgniteConfiguration cfg = getConfiguration("grid-secondary", getSecondaryIgfsConfiguration());
-
- cfg.setLocalHost("127.0.0.1");
- cfg.setPeerClassLoadingEnabled(false);
-
- Ignite secG = G.start(cfg);
-
- return secG.fileSystem("igfs-secondary");
- }
- }
-
- /**
- * Shared memory IPC in DUAL_SYNC mode.
- */
- public static class ShmemDualSync extends PrimarySecondaryTest {
- /** {@inheritDoc} */
- @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
- FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
-
- igfsCfg.setDefaultMode(DUAL_SYNC);
-
- return igfsCfg;
- }
- }
-
- /**
- * Shared memory IPC in DUAL_SYNC mode.
- */
- public static class ShmemDualAsync extends PrimarySecondaryTest {
- /** {@inheritDoc} */
- @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
- FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
-
- igfsCfg.setDefaultMode(DUAL_ASYNC);
-
- return igfsCfg;
- }
- }
-
- /**
- * Loopback socket IPC with secondary file system.
- */
- public abstract static class LoopbackPrimarySecondaryTest extends PrimarySecondaryTest {
- /** {@inheritDoc} */
- @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
- FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
-
- igfsCfg.setDefaultMode(IgfsMode.PRIMARY);
-
- igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
- "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/",
- "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"));
-
- return igfsCfg;
- }
-
- /** {@inheritDoc} */
- @Override protected FileSystemConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException {
- FileSystemConfiguration igfsCfg = super.getSecondaryIgfsConfiguration();
-
- igfsCfg.setName("igfs-secondary");
- igfsCfg.setDefaultMode(PRIMARY);
-
- IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-
- endpointCfg.setType(IgfsIpcEndpointType.TCP);
- endpointCfg.setPort(11500);
-
- igfsCfg.setIpcEndpointConfiguration(endpointCfg);
-
- return igfsCfg;
- }
- }
-
- /**
- * Loopback IPC in DUAL_SYNC mode.
- */
- public static class LoopbackDualSync extends LoopbackPrimarySecondaryTest {
- /** {@inheritDoc} */
- @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
- FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
-
- igfsCfg.setDefaultMode(DUAL_SYNC);
-
- return igfsCfg;
- }
- }
-
- /**
- * Loopback socket IPC in DUAL_ASYNC mode.
- */
- public static class LoopbackDualAsync extends LoopbackPrimarySecondaryTest {
- /** {@inheritDoc} */
- @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
- FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
-
- igfsCfg.setDefaultMode(DUAL_ASYNC);
-
- return igfsCfg;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
deleted file mode 100644
index 8e79356..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs;
-
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED;
-import static org.apache.ignite.events.EventType.EVT_TASK_FAILED;
-import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED;
-
-/**
- * Test hadoop file system implementation.
- */
-public class IgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest {
- /** Path to the default hadoop configuration. */
- public static final String HADOOP_FS_CFG = "examples/config/filesystem/core-site.xml";
-
- /** Group size. */
- public static final int GRP_SIZE = 128;
-
- /** IP finder. */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** Node count. */
- private int cnt;
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- startGrids(nodeCount());
-
- grid(0).createNearCache("data", new NearCacheConfiguration());
-
- grid(0).createNearCache("meta", new NearCacheConfiguration());
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- G.stopAll(true);
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER).setForceServerMode(true));
-
- FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
-
- igfsCfg.setDataCacheName("data");
- igfsCfg.setMetaCacheName("meta");
- igfsCfg.setName("igfs");
-
- IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
-
- endpointCfg.setType(IgfsIpcEndpointType.SHMEM);
- endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt);
-
- igfsCfg.setIpcEndpointConfiguration(endpointCfg);
-
- igfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups.
-
- cfg.setFileSystemConfiguration(igfsCfg);
-
- cfg.setCacheConfiguration(cacheConfiguration(gridName, "data"), cacheConfiguration(gridName, "meta"));
-
- cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
-
- if (cnt == 0)
- cfg.setClientMode(true);
-
- cnt++;
-
- return cfg;
- }
-
- /** @return Node count for test. */
- protected int nodeCount() {
- return 4;
- }
-
- /**
- * Gets cache configuration.
- *
- * @param gridName Grid name.
- * @return Cache configuration.
- */
- protected CacheConfiguration cacheConfiguration(String gridName, String cacheName) {
- CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
- cacheCfg.setName(cacheName);
- cacheCfg.setCacheMode(PARTITIONED);
- cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE));
- cacheCfg.setBackups(0);
- cacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- return cacheCfg;
- }
-
- /**
- * Gets config of concrete File System.
- *
- * @return Config of concrete File System.
- */
- protected Configuration getFileSystemConfig() {
- Configuration cfg = new Configuration();
-
- cfg.addResource(U.resolveIgniteUrl(HADOOP_FS_CFG));
-
- return cfg;
- }
-
- /**
- * Gets File System name.
- *
- * @param grid Grid index.
- * @return File System name.
- */
- protected URI getFileSystemURI(int grid) {
- try {
- return new URI("igfs://127.0.0.1:" + (IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + grid));
- }
- catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
- }
-
- /** @throws Exception If failed. */
- public void testContentsConsistency() throws Exception {
- try (FileSystem fs = FileSystem.get(getFileSystemURI(0), getFileSystemConfig())) {
- Collection<IgniteBiTuple<String, Long>> files = F.asList(
- F.t("/dir1/dir2/file1", 1024L),
- F.t("/dir1/dir2/file2", 8 * 1024L),
- F.t("/dir1/file1", 1024 * 1024L),
- F.t("/dir1/file2", 5 * 1024 * 1024L),
- F.t("/file1", 64 * 1024L + 13),
- F.t("/file2", 13L),
- F.t("/file3", 123764L)
- );
-
- for (IgniteBiTuple<String, Long> file : files) {
-
- info("Writing file: " + file.get1());
-
- try (OutputStream os = fs.create(new Path(file.get1()), (short)3)) {
- byte[] data = new byte[file.get2().intValue()];
-
- data[0] = 25;
- data[data.length - 1] = 26;
-
- os.write(data);
- }
-
- info("Finished writing file: " + file.get1());
- }
-
- for (int i = 1; i < nodeCount(); i++) {
-
- try (FileSystem ignored = FileSystem.get(getFileSystemURI(i), getFileSystemConfig())) {
- for (IgniteBiTuple<String, Long> file : files) {
- Path path = new Path(file.get1());
-
- FileStatus fileStatus = fs.getFileStatus(path);
-
- assertEquals(file.get2(), (Long)fileStatus.getLen());
-
- byte[] read = new byte[file.get2().intValue()];
-
- info("Reading file: " + path);
-
- try (FSDataInputStream in = fs.open(path)) {
- in.readFully(read);
-
- assert read[0] == 25;
- assert read[read.length - 1] == 26;
- }
-
- info("Finished reading file: " + path);
- }
- }
- }
- }
- }
-}
\ No newline at end of file