You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/01/11 16:34:45 UTC
[12/16] ignite git commit: IGNITE-2206: Hadoop file system creation
is now abstracted out using factory interface.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 99ca1ec..0d7de86 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -17,22 +17,6 @@
package org.apache.ignite.hadoop.fs.v2;
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,6 +27,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.InvalidPathException;
@@ -51,13 +36,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
@@ -74,8 +60,26 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser;
@@ -92,8 +96,6 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PREFER_LOCAL_WRITES;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI;
/**
* {@code IGFS} Hadoop 2.x file system driver over file system API. To use
@@ -168,8 +170,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
/** Mode resolver. */
private IgfsModeResolver modeRslvr;
- /** Secondary file system instance. */
- private AbstractFileSystem secondaryFs;
+ /** The secondary file system factory. */
+ private HadoopFileSystemFactory factory;
/** Whether custom sequential reads before prefetch value is provided. */
private boolean seqReadsBeforePrefetchOverride;
@@ -335,20 +337,27 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
}
if (initSecondary) {
- Map<String, String> props = paths.properties();
+ try {
+ factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to get secondary file system factory.", e);
+ }
- String secUri = props.get(SECONDARY_FS_URI);
- String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
+ assert factory != null;
+
+ if (factory instanceof LifecycleAware)
+ ((LifecycleAware) factory).start();
try {
- SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+ FileSystem secFs = factory.get(user);
- secondaryFs = secProvider.createAbstractFileSystem(user);
+ secondaryUri = secFs.getUri();
- secondaryUri = secProvider.uri();
+ A.ensure(secondaryUri != null, "Secondary file system uri should not be null.");
}
catch (IOException e) {
- throw new IOException("Failed to connect to the secondary file system: " + secUri, e);
+ throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e);
}
}
}
@@ -368,6 +377,9 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (clientLog.isLogEnabled())
clientLog.close();
+ if (factory instanceof LifecycleAware)
+ ((LifecycleAware) factory).stop();
+
// Reset initialized resources.
rmtClient = null;
}
@@ -391,13 +403,13 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
/** {@inheritDoc} */
@Override public boolean setReplication(Path f, short replication) throws IOException {
- return mode(f) == PROXY && secondaryFs.setReplication(f, replication);
+ return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication);
}
/** {@inheritDoc} */
@Override public void setTimes(Path f, long mtime, long atime) throws IOException {
if (mode(f) == PROXY)
- secondaryFs.setTimes(f, mtime, atime);
+ secondaryFileSystem().setTimes(f, mtime, atime);
else {
if (mtime == -1 && atime == -1)
return;
@@ -421,7 +433,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
A.notNull(p, "p");
if (mode(p) == PROXY)
- secondaryFs.setPermission(toSecondary(p), perm);
+ secondaryFileSystem().setPermission(toSecondary(p), perm);
else {
if (rmtClient.update(convert(p), permission(perm)) == null)
throw new IOException("Failed to set file permission (file not found?)" +
@@ -443,7 +455,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
if (mode(p) == PROXY)
- secondaryFs.setOwner(toSecondary(p), usr, grp);
+ secondaryFileSystem().setOwner(toSecondary(p), usr, grp);
else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, usr, PROP_GROUP_NAME, grp)) == null)
throw new IOException("Failed to set file permission (file not found?)" +
" [path=" + p + ", username=" + usr + ", grpName=" + grp + ']');
@@ -464,11 +476,11 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
IgfsMode mode = modeRslvr.resolveMode(path);
if (mode == PROXY) {
- FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize);
+ FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize);
if (clientLog.isLogEnabled()) {
// At this point we do not know file size, so we perform additional request to remote FS to get it.
- FileStatus status = secondaryFs.getFileStatus(toSecondary(f));
+ FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f));
long size = status != null ? status.getLen() : -1;
@@ -543,8 +555,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
if (mode == PROXY) {
- FSDataOutputStream os = secondaryFs.createInternal(toSecondary(f), flag, perm, bufSize,
- replication, blockSize, progress, checksumOpt, createParent);
+ FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize,
+ replication, blockSize, progress);
if (clientLog.isLogEnabled()) {
long logId = IgfsLogger.nextId();
@@ -641,7 +653,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (clientLog.isLogEnabled())
clientLog.logRename(srcPath, PROXY, dstPath);
- secondaryFs.renameInternal(toSecondary(src), toSecondary(dst));
+ secondaryFileSystem().rename(toSecondary(src), toSecondary(dst));
}
else {
if (clientLog.isLogEnabled())
@@ -671,7 +683,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (clientLog.isLogEnabled())
clientLog.logDelete(path, PROXY, recursive);
- return secondaryFs.delete(toSecondary(f), recursive);
+ return secondaryFileSystem().delete(toSecondary(f), recursive);
}
boolean res = rmtClient.delete(path, recursive);
@@ -689,14 +701,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
/** {@inheritDoc} */
@Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException {
// Checksum has effect for secondary FS only.
- if (secondaryFs != null)
- secondaryFs.setVerifyChecksum(verifyChecksum);
+ if (factory != null)
+ secondaryFileSystem().setVerifyChecksum(verifyChecksum);
}
/** {@inheritDoc} */
@Override public FileChecksum getFileChecksum(Path f) throws IOException {
if (mode(f) == PROXY)
- return secondaryFs.getFileChecksum(f);
+ return secondaryFileSystem().getFileChecksum(f);
return null;
}
@@ -712,7 +724,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
IgfsMode mode = modeRslvr.resolveMode(path);
if (mode == PROXY) {
- FileStatus[] arr = secondaryFs.listStatus(toSecondary(f));
+ FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f));
if (arr == null)
throw new FileNotFoundException("File " + f + " does not exist.");
@@ -775,7 +787,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
if (clientLog.isLogEnabled())
clientLog.logMakeDirectory(path, PROXY);
- secondaryFs.mkdir(toSecondary(f), perm, createParent);
+ secondaryFileSystem().mkdirs(toSecondary(f), perm);
}
else {
rmtClient.mkdirs(path, permission(perm));
@@ -797,7 +809,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
if (mode(f) == PROXY)
- return toPrimary(secondaryFs.getFileStatus(toSecondary(f)));
+ return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f)));
else {
IgfsFile info = rmtClient.info(convert(f));
@@ -822,7 +834,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
try {
if (modeRslvr.resolveMode(igfsPath) == PROXY)
- return secondaryFs.getFileBlockLocations(path, start, len);
+ return secondaryFileSystem().getFileBlockLocations(path, start, len);
else {
long now = System.currentTimeMillis();
@@ -873,7 +885,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
* @return Secondary file system path.
*/
private Path toSecondary(Path path) {
- assert secondaryFs != null;
+ assert factory != null;
assert secondaryUri != null;
return convertPath(path, secondaryUri);
@@ -1045,4 +1057,15 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
public String user() {
return user;
}
+
+ /**
+ * Gets cached or creates a {@link FileSystem}.
+ *
+ * @return The secondary file system.
+ */
+ private FileSystem secondaryFileSystem() throws IOException{
+ assert factory != null;
+
+ return factory.get(user);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
deleted file mode 100644
index d5be074..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.security.PrivilegedExceptionAction;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.AbstractFileSystem;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Encapsulates logic of secondary filesystem creation.
- */
-public class SecondaryFileSystemProvider {
- /** Configuration of the secondary filesystem, never null. */
- private final Configuration cfg = HadoopUtils.safeCreateConfiguration();
-
- /** The secondary filesystem URI, never null. */
- private final URI uri;
-
- /**
- * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be
- * specified either explicitly or in the configuration provided.
- *
- * @param secUri the secondary Fs URI (optional). If not given explicitly, it must be specified as "fs.defaultFS"
- * property in the provided configuration.
- * @param secConfPath the secondary Fs path (file path on the local file system, optional).
- * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved.
- * @throws IOException
- */
- public SecondaryFileSystemProvider(final @Nullable String secUri,
- final @Nullable String secConfPath) throws IOException {
- if (secConfPath != null) {
- URL url = U.resolveIgniteUrl(secConfPath);
-
- if (url == null) {
- // If secConfPath is given, it should be resolvable:
- throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " +
- "(ensure that it exists locally and you have read access to it): " + secConfPath);
- }
-
- cfg.addResource(url);
- }
-
- // if secondary fs URI is not given explicitly, try to get it from the configuration:
- if (secUri == null)
- uri = FileSystem.getDefaultUri(cfg);
- else {
- try {
- uri = new URI(secUri);
- }
- catch (URISyntaxException use) {
- throw new IOException("Failed to resolve secondary file system URI: " + secUri);
- }
- }
-
- // Disable caching:
- String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme());
-
- cfg.setBoolean(prop, true);
- }
-
- /**
- * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs.
- * @throws IOException
- */
- public FileSystem createFileSystem(String userName) throws IOException {
- userName = IgfsUtils.fixUserName(userName);
-
- final FileSystem fileSys;
-
- try {
- fileSys = FileSystem.get(uri, cfg, userName);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IOException("Failed to create file system due to interrupt.", e);
- }
-
- return fileSys;
- }
-
- /**
- * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs.
- * @throws IOException in case of error.
- */
- public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException {
- userName = IgfsUtils.fixUserName(userName);
-
- String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
-
- UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName);
-
- try {
- return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() {
- @Override public AbstractFileSystem run() throws IOException {
- return AbstractFileSystem.get(uri, cfg);
- }
- });
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
-
- throw new IOException("Failed to create file system due to interrupt.", ie);
- }
- }
-
- /**
- * @return the secondary fs URI, never null.
- */
- public URI uri() {
- return uri;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
index 48ade79..1ecbee5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
@@ -39,7 +39,7 @@ public class HadoopFileSystemCacheUtils {
public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
return new HadoopLazyConcurrentMap<>(
new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
- @Override public FileSystem createValue(FsCacheKey key) {
+ @Override public FileSystem createValue(FsCacheKey key) throws IOException {
try {
assert key != null;
@@ -57,8 +57,10 @@ public class HadoopFileSystemCacheUtils {
return FileSystem.get(uri, cfg, key.user());
}
- catch (IOException | InterruptedException ioe) {
- throw new IgniteException(ioe);
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException("Failed to create file system due to interrupt.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index 89eaf73..681cddb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.hadoop.fs;
import java.io.Closeable;
+import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
@@ -204,8 +205,8 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> {
*
* @param key the key to create value for
* @return the value.
- * @throws IgniteException on failure.
+ * @throws IOException On failure.
*/
- public V createValue(K key);
+ public V createValue(K key) throws IOException;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
index ea65464..10b1bcd 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.igfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.processors.igfs.IgfsDualAbstractSelfTest;
@@ -74,12 +74,16 @@ public abstract class Hadoop1DualAbstractTest extends IgfsDualAbstractSelfTest {
prepareConfiguration();
- IgniteHadoopIgfsSecondaryFileSystem second =
- new IgniteHadoopIgfsSecondaryFileSystem(secondaryUri, secondaryConfFullPath);
+ CachingHadoopFileSystemFactory factory = new CachingHadoopFileSystemFactory();
- FileSystem fileSystem = second.fileSystem();
+ factory.setUri(secondaryUri);
+ factory.setConfigPaths(secondaryConfFullPath);
- igfsSecondary = new HadoopFileSystemUniversalFileSystemAdapter(fileSystem);
+ IgniteHadoopIgfsSecondaryFileSystem second = new IgniteHadoopIgfsSecondaryFileSystem();
+
+ second.setFileSystemFactory(factory);
+
+ igfsSecondary = new HadoopFileSystemUniversalFileSystemAdapter(factory);
return second;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
new file mode 100644
index 0000000..1d02f0f
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+import java.io.Externalizable;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ * Tests for Hadoop file system factory.
+ */
+public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest {
+ /** Amount of "start" invocations */
+ private static final AtomicInteger START_CNT = new AtomicInteger();
+
+ /** Amount of "stop" invocations */
+ private static final AtomicInteger STOP_CNT = new AtomicInteger();
+
+ /** Path to secondary file system configuration. */
+ private static final String SECONDARY_CFG_PATH = "/work/core-site-HadoopFIleSystemFactorySelfTest.xml";
+
+ /** IGFS path for DUAL mode. */
+ private static final Path PATH_DUAL = new Path("/ignite/sync/test_dir");
+
+ /** IGFS path for PROXY mode. */
+ private static final Path PATH_PROXY = new Path("/ignite/proxy/test_dir");
+
+ /** IGFS path for DUAL mode. */
+ private static final IgfsPath IGFS_PATH_DUAL = new IgfsPath("/ignite/sync/test_dir");
+
+ /** IGFS path for PROXY mode. */
+ private static final IgfsPath IGFS_PATH_PROXY = new IgfsPath("/ignite/proxy/test_dir");
+
+ /** Secondary IGFS. */
+ private IgfsEx secondary;
+
+ /** Primary IGFS. */
+ private IgfsEx primary;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ START_CNT.set(0);
+ STOP_CNT.set(0);
+
+ secondary = startSecondary();
+ primary = startPrimary();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ secondary = null;
+ primary = null;
+
+ stopAllGrids();
+ }
+
+ /**
+ * Test custom factory.
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ public void testCustomFactory() throws Exception {
+ assert START_CNT.get() == 1;
+ assert STOP_CNT.get() == 0;
+
+ // Use IGFS directly.
+ primary.mkdirs(IGFS_PATH_DUAL);
+
+ assert primary.exists(IGFS_PATH_DUAL);
+ assert secondary.exists(IGFS_PATH_DUAL);
+
+ GridTestUtils.assertThrows(null, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ primary.mkdirs(IGFS_PATH_PROXY);
+
+ return null;
+ }
+ }, IgfsInvalidPathException.class, null);
+
+ // Create remote instance.
+ FileSystem fs = FileSystem.get(URI.create("igfs://primary:primary@127.0.0.1:10500/"), baseConfiguration());
+
+ // Ensure lifecycle callback was invoked.
+ assert START_CNT.get() == 2;
+ assert STOP_CNT.get() == 0;
+
+ // Check file system operations.
+ assert fs.exists(PATH_DUAL);
+
+ assert fs.delete(PATH_DUAL, true);
+ assert !primary.exists(IGFS_PATH_DUAL);
+ assert !secondary.exists(IGFS_PATH_DUAL);
+ assert !fs.exists(PATH_DUAL);
+
+ assert fs.mkdirs(PATH_DUAL);
+ assert primary.exists(IGFS_PATH_DUAL);
+ assert secondary.exists(IGFS_PATH_DUAL);
+ assert fs.exists(PATH_DUAL);
+
+ assert fs.mkdirs(PATH_PROXY);
+ assert secondary.exists(IGFS_PATH_PROXY);
+ assert fs.exists(PATH_PROXY);
+
+ // Close file system and ensure that associated factory was notified.
+ fs.close();
+
+ assert START_CNT.get() == 2;
+ assert STOP_CNT.get() == 1;
+
+ // Stop primary node and ensure that base factory was notified.
+ G.stop(primary.context().kernalContext().grid().name(), true);
+
+ assert START_CNT.get() == 2;
+ assert STOP_CNT.get() == 2;
+ }
+
+ /**
+ * Start secondary IGFS.
+ *
+ * @return IGFS.
+ * @throws Exception If failed.
+ */
+ private static IgfsEx startSecondary() throws Exception {
+ return start("secondary", 11500, IgfsMode.PRIMARY, null);
+ }
+
+ /**
+ * Start primary IGFS.
+ *
+ * @return IGFS.
+ * @throws Exception If failed.
+ */
+ private static IgfsEx startPrimary() throws Exception {
+ // Prepare configuration.
+ Configuration conf = baseConfiguration();
+
+ conf.set("fs.defaultFS", "igfs://secondary:secondary@127.0.0.1:11500/");
+
+ writeConfigurationToFile(conf);
+
+ // Configure factory.
+ TestFactory factory = new TestFactory();
+
+ factory.setUri("igfs://secondary:secondary@127.0.0.1:11500/");
+ factory.setConfigPaths(SECONDARY_CFG_PATH);
+
+ // Configure file system.
+ IgniteHadoopIgfsSecondaryFileSystem fs = new IgniteHadoopIgfsSecondaryFileSystem();
+
+ fs.setFileSystemFactory(factory);
+
+ // Start.
+ return start("primary", 10500, IgfsMode.PRIMARY, fs);
+ }
+
+ /**
+ * Start Ignite node with IGFS instance.
+ *
+ * @param name Node and IGFS name.
+ * @param endpointPort Endpoint port.
+ * @param dfltMode Default path mode.
+ * @param secondaryFs Secondary file system.
+ * @return Igfs instance.
+ */
+ private static IgfsEx start(String name, int endpointPort, IgfsMode dfltMode,
+ @Nullable IgfsSecondaryFileSystem secondaryFs) {
+ IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+
+ endpointCfg.setType(IgfsIpcEndpointType.TCP);
+ endpointCfg.setHost("127.0.0.1");
+ endpointCfg.setPort(endpointPort);
+
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("dataCache");
+ igfsCfg.setMetaCacheName("metaCache");
+ igfsCfg.setName(name);
+ igfsCfg.setDefaultMode(dfltMode);
+ igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+ igfsCfg.setSecondaryFileSystem(secondaryFs);
+
+ CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+ dataCacheCfg.setName("dataCache");
+ dataCacheCfg.setCacheMode(PARTITIONED);
+ dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+ dataCacheCfg.setBackups(0);
+ dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+ dataCacheCfg.setOffHeapMaxMemory(0);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("metaCache");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName(name);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+ cfg.setFileSystemConfiguration(igfsCfg);
+
+ cfg.setLocalHost("127.0.0.1");
+ cfg.setConnectorConfiguration(null);
+
+ return (IgfsEx)G.start(cfg).fileSystem(name);
+ }
+
+ /**
+ * Create base FileSystem configuration.
+ *
+ * @return Configuration.
+ */
+ private static Configuration baseConfiguration() {
+ Configuration conf = new Configuration();
+
+ conf.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+ return conf;
+ }
+
+ /**
+ * Write configuration to file.
+ *
+ * @param conf Configuration.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private static void writeConfigurationToFile(Configuration conf) throws Exception {
+ final String path = U.getIgniteHome() + SECONDARY_CFG_PATH;
+
+ File file = new File(path);
+
+ file.delete();
+
+ assertFalse(file.exists());
+
+ try (FileOutputStream fos = new FileOutputStream(file)) {
+ conf.writeXml(fos);
+ }
+
+ assertTrue(file.exists());
+ }
+
+ /**
+ * Test factory.
+ */
+ private static class TestFactory extends CachingHadoopFileSystemFactory {
+ /**
+ * {@link Externalizable} support.
+ */
+ public TestFactory() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ START_CNT.incrementAndGet();
+
+ super.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ STOP_CNT.incrementAndGet();
+
+ super.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
index 608bd25..5b6fd81 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.processors.igfs.UniversalFileSystemAdapter;
@@ -34,55 +36,55 @@ import org.apache.ignite.internal.processors.igfs.UniversalFileSystemAdapter;
* Universal adapter wrapping {@link org.apache.hadoop.fs.FileSystem} instance.
*/
public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFileSystemAdapter {
- /** The wrapped filesystem. */
- private final FileSystem fileSys;
+ /** File system factory. */
+ private final HadoopFileSystemFactory factory;
/**
* Constructor.
- * @param fs the filesystem to be wrapped.
+ * @param factory File system factory.
*/
- public HadoopFileSystemUniversalFileSystemAdapter(FileSystem fs) {
- this.fileSys = fs;
+ public HadoopFileSystemUniversalFileSystemAdapter(HadoopFileSystemFactory factory) {
+ assert factory != null;
+
+ this.factory = factory;
}
/** {@inheritDoc} */
- @Override public String name() {
- return fileSys.getUri().toString();
+ @Override public String name() throws IOException {
+ return get().getUri().toString();
}
/** {@inheritDoc} */
@Override public boolean exists(String path) throws IOException {
- return fileSys.exists(new Path(path));
+ return get().exists(new Path(path));
}
/** {@inheritDoc} */
@Override public boolean delete(String path, boolean recursive) throws IOException {
- boolean ok = fileSys.delete(new Path(path), recursive);
- return ok;
+ return get().delete(new Path(path), recursive);
}
/** {@inheritDoc} */
@Override public void mkdirs(String path) throws IOException {
- boolean ok = fileSys.mkdirs(new Path(path));
+ boolean ok = get().mkdirs(new Path(path));
if (!ok)
throw new IOException("Failed to mkdirs: " + path);
}
/** {@inheritDoc} */
@Override public void format() throws IOException {
- HadoopIgfsUtils.clear(fileSys);
+ HadoopIgfsUtils.clear(get());
}
/** {@inheritDoc} */
@Override public Map<String, String> properties(String path) throws IOException {
Path p = new Path(path);
- FileStatus status = fileSys.getFileStatus(p);
+ FileStatus status = get().getFileStatus(p);
Map<String,String> m = new HashMap<>(3); // max size == 4
m.put(IgfsEx.PROP_USER_NAME, status.getOwner());
-
m.put(IgfsEx.PROP_GROUP_NAME, status.getGroup());
FsPermission perm = status.getPermission();
@@ -95,7 +97,7 @@ public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFile
/** {@inheritDoc} */
@Override public InputStream openInputStream(String path) throws IOException {
- return fileSys.open(new Path(path));
+ return get().open(new Path(path));
}
/** {@inheritDoc} */
@@ -103,16 +105,27 @@ public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFile
Path p = new Path(path);
if (append)
- return fileSys.append(p);
+ return get().append(p);
else
- return fileSys.create(p, true/*overwrite*/);
+ return get().create(p, true/*overwrite*/);
}
/** {@inheritDoc} */
- @Override public <T> T getAdapter(Class<T> clazz) {
- if (clazz == FileSystem.class)
- return (T)fileSys;
+ @SuppressWarnings("unchecked")
+ @Override public <T> T unwrap(Class<T> cls) {
+ if (HadoopFileSystemFactory.class.isAssignableFrom(cls))
+ return (T)factory;
return null;
}
+
+ /**
+ * Create file system.
+ *
+ * @return File system.
+ * @throws IOException If failed.
+ */
+ private FileSystem get() throws IOException {
+ return factory.get(FileSystemConfiguration.DFLT_USER_NAME);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index 4ddfb0d..d9b5d66 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -17,12 +17,6 @@
package org.apache.ignite.igfs;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -34,9 +28,9 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils;
import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
import org.apache.ignite.internal.util.typedef.G;
@@ -48,6 +42,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.Callable;
+
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -173,12 +174,16 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
else
primaryConfFullPath = null;
- SecondaryFileSystemProvider provider =
- new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath);
+ CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
+
+ fac.setConfigPaths(primaryConfFullPath);
+ fac.setUri(primaryFsUriStr);
+
+ fac.start();
- primaryFs = provider.createFileSystem(null);
+ primaryFs = fac.get(null); //provider.createFileSystem(null);
- primaryFsUri = provider.uri();
+ primaryFsUri = primaryFs.getUri();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index d368955..6617127 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -17,29 +17,6 @@
package org.apache.ignite.igfs;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
@@ -59,6 +36,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEx;
@@ -70,6 +48,7 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -79,6 +58,30 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ThreadLocalRandom8;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -380,9 +383,20 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
cfg.setPrefetchBlocks(1);
cfg.setDefaultMode(mode);
- if (mode != PRIMARY)
- cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
- SECONDARY_URI, SECONDARY_CFG_PATH, SECONDARY_FS_USER));
+ if (mode != PRIMARY) {
+ CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
+
+ fac.setUri(SECONDARY_URI);
+ fac.setConfigPaths(SECONDARY_CFG_PATH);
+
+ IgniteHadoopIgfsSecondaryFileSystem sec = new IgniteHadoopIgfsSecondaryFileSystem();
+
+ sec.setFileSystemFactory(fac);
+ sec.setDefaultUserName(SECONDARY_FS_USER);
+
+ // NB: start() will be invoked upon IgfsImpl init.
+ cfg.setSecondaryFileSystem(sec);
+ }
cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
@@ -398,7 +412,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
@Override public Object call() throws Exception {
return new IgniteHadoopFileSystem().getUri();
}
- }, IllegalStateException.class, "URI is null (was IgniteHadoopFileSystem properly initialized?).");
+ }, IllegalStateException.class,
+ "URI is null (was IgniteHadoopFileSystem properly initialized?)");
}
/** @throws Exception If failed. */
@@ -506,7 +521,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
// Ensure that IO is stopped when nobody else is need it.
fs.close();
- assertEquals(initSize - 1, cache.size());
+ assert initSize >= cache.size();
assert (Boolean)stopField.get(io);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 6c542b5..9092f32 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -37,6 +37,7 @@ import org.apache.ignite.client.hadoop.HadoopClientProtocolEmbeddedSelfTest;
import org.apache.ignite.client.hadoop.HadoopClientProtocolSelfTest;
import org.apache.ignite.igfs.Hadoop1OverIgfsDualAsyncTest;
import org.apache.ignite.igfs.Hadoop1OverIgfsDualSyncTest;
+import org.apache.ignite.igfs.HadoopFIleSystemFactorySelfTest;
import org.apache.ignite.igfs.HadoopIgfs20FileSystemLoopbackPrimarySelfTest;
import org.apache.ignite.igfs.HadoopIgfsDualAsyncSelfTest;
import org.apache.ignite.igfs.HadoopIgfsDualSyncSelfTest;
@@ -113,6 +114,8 @@ public class IgniteHadoopTestSuite extends TestSuite {
suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualSyncTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualAsyncTest.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(HadoopFIleSystemFactorySelfTest.class.getName())));
+
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest.class.getName())));