You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/12/23 11:40:53 UTC
[11/12] ignite git commit: IGNITE-2206: WIP.
IGNITE-2206: WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/13f8170d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/13f8170d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/13f8170d
Branch: refs/heads/ignite-2206
Commit: 13f8170da81007b4a32eb3918a125df120bb0065
Parents: 2f38e46
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Dec 23 13:17:34 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Dec 23 13:17:34 2015 +0300
----------------------------------------------------------------------
.../igfs/secondary/IgfsSecondaryFileSystem.java | 22 --
.../processors/hadoop/HadoopPayloadAware.java | 22 +-
.../ignite/internal/processors/igfs/IgfsEx.java | 13 --
.../processors/igfs/IgfsHandshakeResponse.java | 3 +-
.../internal/processors/igfs/IgfsImpl.java | 10 +-
.../internal/processors/igfs/IgfsPaths.java | 32 +--
.../igfs/IgfsSecondaryFileSystemImpl.java | 15 --
.../ignite/internal/util/lang/GridFunc.java | 24 ---
.../visor/node/VisorIgfsConfiguration.java | 57 ------
.../fs/CachingHadoopFileSystemFactory.java | 202 +++++++++++++++++++
.../hadoop/fs/HadoopFileSystemFactory.java | 41 ++++
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 45 ++---
.../fs/v1/DefaultHadoopFileSystemFactory.java | 185 -----------------
.../hadoop/fs/v1/HadoopFileSystemFactory.java | 21 --
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 3 +-
.../hadoop/SecondaryFileSystemProvider.java | 5 +-
.../IgniteHadoopFileSystemAbstractSelfTest.java | 8 +-
17 files changed, 307 insertions(+), 401 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 354b0ae..3f124eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -192,26 +192,4 @@ public interface IgfsSecondaryFileSystem {
* @throws IgniteException In case of error.
*/
public long usedSpaceSize() throws IgniteException;
-
-// /**
-// * Gets the implementation specific properties of file system.
-// *
-// * @return Map of properties.
-// * @deprecated Should not be used.
-// */
-// @Deprecated
-// public Map<String,String> properties();
-//
-// /**
-// *
-// * @return The factory.
-// */
-// public @Nullable HadoopFileSystemFactory<F> getSecondaryFileSystemFactory();
-
-
- /**
- * Closes the secondary file system.
- * @throws IgniteException in case of an error.
- */
- public void close() throws IgniteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java
index dcb163f..9b79729 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java
@@ -1,12 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.processors.hadoop;
/**
- *
+ * Gets payload for Hadoop secondary file system.
*/
public interface HadoopPayloadAware {
/**
- *
- * @return
+ * @return Payload.
*/
public Object getPayload();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index a338813..cf268e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -43,19 +43,6 @@ public interface IgfsEx extends IgniteFileSystem {
/** File property: prefer writes to local node. */
public static final String PROP_PREFER_LOCAL_WRITES = "locWrite";
-// /** Property name for path to Hadoop configuration. */
-// public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
-//
-// /** Property name for URI of file system. */
-// public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
-//
-// /** Property name for default user name of file system.
-// * NOTE: for secondary file system this is just a default user name, which is used
-// * when the 2ndary filesystem is used outside of any user context.
-// * If another user name is set in the context, 2ndary file system will work on behalf
-// * of that user, which is different from the default. */
-// public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
-
/**
* Stops IGFS cleaning all used resources.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHandshakeResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHandshakeResponse.java
index 52cae8f..1ba98ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHandshakeResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHandshakeResponse.java
@@ -55,8 +55,7 @@ public class IgfsHandshakeResponse implements Externalizable {
* @param paths Secondary paths.
* @param blockSize Server default block size.
*/
- public IgfsHandshakeResponse(String igfsName, IgfsPaths paths,
- long blockSize, Boolean sampling) {
+ public IgfsHandshakeResponse(String igfsName, IgfsPaths paths, long blockSize, Boolean sampling) {
assert paths != null;
this.igfsName = igfsName;
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 7453e15..a44329a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -264,12 +264,7 @@ public final class IgfsImpl implements IgfsEx {
if (secondaryFs instanceof HadoopPayloadAware)
secondaryFsPayload = ((HadoopPayloadAware) secondaryFs).getPayload();
- secondaryPaths = new IgfsPaths(
- //secondaryFs == null ? null : secondaryFs.properties(),
- //secondaryFs == null ? null : secondaryFs.getSecondaryFileSystemFactory(),
- secondaryFsPayload,
- dfltMode,
- modeRslvr.modesOrdered());
+ secondaryPaths = new IgfsPaths(secondaryFsPayload, dfltMode, modeRslvr.modesOrdered());
// Check whether IGFS LRU eviction policy is set on data cache.
String dataCacheName = igfsCtx.configuration().getDataCacheName();
@@ -317,7 +312,8 @@ public final class IgfsImpl implements IgfsEx {
batch.cancel();
try {
- secondaryFs.close();
+ if (secondaryFs instanceof LifecycleAware)
+ ((LifecycleAware)secondaryFs).stop();
}
catch (Exception e) {
log.error("Failed to close secondary file system.", e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
index 83451db..986f59f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
@@ -59,12 +59,11 @@ public class IgfsPaths implements Externalizable {
/**
* Constructor.
*
+ * @param payload Payload.
* @param dfltMode Default IGFS mode.
* @param pathModes Path modes.
*/
- public IgfsPaths(Object payload,
- IgfsMode dfltMode,
- @Nullable List<T2<IgfsPath, IgfsMode>> pathModes) {
+ public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> pathModes) {
this.payload = payload;
this.dfltMode = dfltMode;
this.pathModes = pathModes;
@@ -84,10 +83,15 @@ public class IgfsPaths implements Externalizable {
return pathModes;
}
+ /**
+ * @return Payload.
+ */
+ @Nullable public Object getPayload() {
+ return payload;
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
-// U.writeStringMap(out, props);
-
writePayload(out);
U.writeEnum(out, dfltMode);
@@ -106,9 +110,10 @@ public class IgfsPaths implements Externalizable {
}
/**
+ * Write payload.
*
- * @param out
- * @throws IOException
+ * @param out Output stream.
+ * @throws IOException If failed.
*/
private void writePayload(ObjectOutput out) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -127,8 +132,6 @@ public class IgfsPaths implements Externalizable {
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-// props = U.readStringMap(in);
-
readPayload(in);
dfltMode = IgfsMode.fromOrdinal(in.readByte());
@@ -150,10 +153,11 @@ public class IgfsPaths implements Externalizable {
}
/**
+ * Read payload.
*
- * @param in
- * @throws IOException
- * @throws ClassNotFoundException
+ * @param in Input stream.
+ * @throws IOException If failed.
+ * @throws ClassNotFoundException If failed.
*/
private void readPayload(ObjectInput in) throws IOException, ClassNotFoundException {
byte[] factoryBytes = U.readByteArray(in);
@@ -167,8 +171,4 @@ public class IgfsPaths implements Externalizable {
oi.close();
}
}
-
- public Object getPayload() {
- return payload;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index ced6b21..44e858f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -115,19 +115,4 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
@Override public long usedSpaceSize() throws IgniteException {
return igfs.usedSpaceSize();
}
-
-// /** {@inheritDoc} */
-// @Override public Map<String, String> properties() {
-// return Collections.emptyMap();
-// }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteException {
- // No-op.
- }
-
-// /** {@inheritDoc} */
-// @Override public HadoopFileSystemFactory<IgfsEx> getSecondaryFileSystemFactory() {
-// return null;
-// }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 224d205..8d5a8e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -2290,30 +2290,6 @@ public class GridFunc {
}
/**
- * Nullifies an empty String.
- * @param x The argument.
- * @return Nullified argument.
- */
- public static String nullifyEmpty(String x) {
- if (isEmpty(x))
- return null;
-
- return x;
- }
-
- /**
- * Nullifies an empty collection.
- * @param c The argument.
- * @return Nullified argument.
- */
- public static <T> Collection<T> nullifyEmpty(Collection<T> c) {
- if (isEmpty(c))
- return null;
-
- return c;
- }
-
- /**
* Utility map getter. This method analogous to {@link #addIfAbsent(Map, Object, Callable)}
* method but this one doesn't put the default value into the map when key is not found.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
index 2238fdf..ea0e721 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
@@ -29,9 +29,6 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
-//import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH;
-//import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI;
-//import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_USER_NAME;
import static org.apache.ignite.internal.visor.util.VisorTaskUtils.compactClass;
/**
@@ -65,17 +62,6 @@ public class VisorIgfsConfiguration implements Serializable {
/** Number of batches that can be concurrently sent to remote node. */
private int perNodeParallelBatchCnt;
-// /** URI of the secondary Hadoop file system. */
-// private String secondaryHadoopFileSysUri;
-//
-// /** Path for the secondary hadoop file system config. */
-// private String secondaryHadoopFileSysCfgPath;
-//
-// /** User name for the secondary hadoop file system config. */
-// private String secondaryHadoopFileSysUserName;
-
-// private HadoopFileSystemFactory factory;
-
/** IGFS instance mode. */
private IgfsMode dfltMode;
@@ -143,19 +129,6 @@ public class VisorIgfsConfiguration implements Serializable {
cfg.perNodeBatchSize = igfs.getPerNodeBatchSize();
cfg.perNodeParallelBatchCnt = igfs.getPerNodeParallelBatchCount();
- IgfsSecondaryFileSystem secFs = igfs.getSecondaryFileSystem();
-
-// if (secFs != null) {
-// //Map<String, String> props = secFs.properties();
-//
-// //cfg.secondaryHadoopFileSysUri = props.get(SECONDARY_FS_URI);
-// //cfg.secondaryHadoopFileSysCfgPath = props.get(SECONDARY_FS_CONFIG_PATH);
-// //cfg.secondaryHadoopFileSysUserName = props.get(SECONDARY_FS_USER_NAME);
-//
-// // Just take and save the factory object:
-// cfg.factory = secFs.getSecondaryFileSystemFactory();
-// }
-
cfg.dfltMode = igfs.getDefaultMode();
cfg.pathModes = igfs.getPathModes();
cfg.dualModePutExecutorSrvc = compactClass(igfs.getDualModePutExecutorService());
@@ -255,36 +228,6 @@ public class VisorIgfsConfiguration implements Serializable {
return perNodeParallelBatchCnt;
}
-// /**
-// * @return URI of the secondary Hadoop file system.
-// */
-// @Nullable public String secondaryHadoopFileSystemUri() {
-// return secondaryHadoopFileSysUri;
-// }
-//
-// /**
-// * @return User name of the secondary Hadoop file system.
-// */
-// @Nullable public String secondaryHadoopFileSystemUserName() {
-// return secondaryHadoopFileSysUserName;
-// }
-//
-// /**
-// * @return Path for the secondary hadoop file system config.
-// */
-// @Nullable public String secondaryHadoopFileSystemConfigPath() {
-// return secondaryHadoopFileSysCfgPath;
-// }
-
-// /**
-// *
-// * @param <T>
-// * @return
-// */
-// public <T> HadoopFileSystemFactory<T> secondaryFileSystemFactory() {
-// return factory;
-// }
-
/**
* @return IGFS instance mode.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
new file mode 100644
index 0000000..3bad850
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
+import org.apache.ignite.internal.processors.igfs.IgfsPaths;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+/**
+ * The class is to be instantiated as a Spring beans, so it must have public zero-arg constructor.
+ * The class is serializable as it will be transferred over the network as a part of {@link IgfsPaths} object.
+ */
+public class CachingHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable, LifecycleAware {
+ /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+ private final transient HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+ new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
+ @Override public FileSystem createValue(String key) {
+ try {
+ assert !F.isEmpty(key);
+
+ return createFileSystem(key);
+ }
+ catch (IOException ioe) {
+ throw new IgniteException(ioe);
+ }
+ }
+ }
+ );
+
+ /** Configuration of the secondary filesystem, never null. */
+ protected transient Configuration cfg;
+
+ /** */
+ protected transient URI uri;
+
+ /** */
+ protected String uriStr;
+
+ /** */
+ protected List<String> cfgPathStr;
+
+ int getCount = 0;
+
+ /**
+ *
+ */
+ public CachingHadoopFileSystemFactory() {
+ //
+
+
+
+ }
+
+ @Override public FileSystem create(String userName) throws IOException {
+ A.ensure(cfg != null, "cfg");
+
+ if (getCount == 0)
+ assert fileSysLazyMap.size() == 0;
+
+ getCount++;
+
+ return fileSysLazyMap.getOrCreate(userName);
+ }
+
+ // TODO: Add getter.
+
+ /**
+ * Uri setter.
+ * @param uriStr
+ */
+ public void setUri(String uriStr) {
+ this.uriStr = uriStr;
+ }
+
+ // TODO: Add getter.
+
+ /**
+ * Configuration(s) setter, to be invoked from Spring config.
+ * @param cfgPaths
+ */
+ public void setConfigPaths(List<String> cfgPaths) {
+ this.cfgPathStr = cfgPaths;
+ }
+
+ /**
+ * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs.
+ * @throws IOException
+ */
+ protected FileSystem createFileSystem(String userName) throws IOException {
+ userName = IgfsUtils.fixUserName(userName);
+
+ assert cfg != null;
+
+ final FileSystem fileSys;
+
+ try {
+ fileSys = FileSystem.get(uri, cfg, userName);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException("Failed to create file system due to interrupt.", e);
+ }
+
+ return fileSys;
+ }
+
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, uriStr);
+
+ U.writeCollection(out, cfgPathStr);
+ }
+
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ uriStr = U.readString(in);
+
+ cfgPathStr = new ArrayList(U.readCollection(in));
+ }
+
+ @Override public void start() throws IgniteException {
+ cfg = HadoopUtils.safeCreateConfiguration();
+
+ if (cfgPathStr != null) {
+ for (String confPath : cfgPathStr) {
+ if (confPath != null) {
+ URL url = U.resolveIgniteUrl(confPath);
+
+ if (url == null) {
+ // If secConfPath is given, it should be resolvable:
+ throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " +
+
+ "(ensure that it exists locally and you have read access to it): " + confPath);
+ }
+
+ cfg.addResource(url);
+ }
+ }
+ }
+
+ // if secondary fs URI is not given explicitly, try to get it from the configuration:
+ if (uriStr == null)
+ uri = FileSystem.getDefaultUri(cfg);
+ else {
+ try {
+ uri = new URI(uriStr);
+ }
+ catch (URISyntaxException use) {
+ throw new IgniteException("Failed to resolve secondary file system URI: " + uriStr);
+ }
+ }
+
+ assert uriStr != null;
+
+ // Disable caching:
+ String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme());
+
+ cfg.setBoolean(prop, true);
+ }
+
+ @Override public void stop() throws IgniteException {
+ try {
+ fileSysLazyMap.close();
+ }
+ catch (IgniteCheckedException ice) {
+ throw new IgniteException(ice);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
new file mode 100644
index 0000000..b4edab8
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * This factory is {@link Serializable} because it should be transferable over the network.
+ * <p>
+ * Implementations may choose not to construct a new instance, but instead
+ * return a previously created instance.
+ */
+public interface HadoopFileSystemFactory extends Serializable {
+ /**
+ * Creates the file system, possibly taking a cached instance.
+ * All the other data needed for the file system creation are expected to be contained
+ * in this object instance.
+ *
+ * @param userName The user name
+ * @return The file system.
+ * @throws IOException On error.
+ */
+ public FileSystem create(String userName) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 86ed7a0..756989c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.v1.HadoopFileSystemFactory;
import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
@@ -47,14 +46,12 @@ import org.apache.ignite.igfs.IgfsUserContext;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
-import org.apache.ignite.hadoop.fs.v1.DefaultHadoopFileSystemFactory;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
import org.apache.ignite.internal.processors.igfs.IgfsFileInfo;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.typedef.F;
-import static org.apache.ignite.internal.util.typedef.F.*;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.lifecycle.LifecycleAware;
@@ -95,7 +92,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
// private @Nullable Collection<URI> cfgPaths;
/** The default user name. It is used if no user context is set. */
- private String dfltUserName = IgfsUtils.fixUserName(null);
+ private String usrName = IgfsUtils.fixUserName(null);
/** */
private HadoopFileSystemFactory fsFactory;
@@ -160,33 +157,22 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Deprecated
public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath,
@Nullable String userName) throws IgniteCheckedException {
+ CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
- uri = nullifyEmpty(uri);
-// if (uri != null)
-// U.warn(null, "This constructor is deprecated. URI value passed in will be ignored.");
-
- cfgPath = nullifyEmpty(cfgPath);
-// if (cfgPath != null)
-// U.warn(null, "This constructor is deprecated. The configurationPath value passed in will be ignored.");
-
- DefaultHadoopFileSystemFactory fac = new DefaultHadoopFileSystemFactory();
-
- if (uri != null)
- fac.setUri(uri);
+ fac.setUri(uri);
if (cfgPath != null)
fac.setConfigPaths(Collections.singletonList(cfgPath));
- setFsFactory(fac);
-
- setDfltUserName(userName);
+ setFileSystemFactory(fac);
+ setUserName(userName);
}
/**
*
* @param factory
*/
- public void setFsFactory(HadoopFileSystemFactory factory) {
+ public void setFileSystemFactory(HadoopFileSystemFactory factory) {
A.ensure(factory != null, "Factory value must not be null.");
this.fsFactory = factory;
@@ -194,10 +180,11 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/**
*
- * @param dfltUserName
+ * @param usrName
*/
- public void setDfltUserName(String dfltUserName) {
- this.dfltUserName = IgfsUtils.fixUserName(nullifyEmpty(dfltUserName));
+ public void setUserName(String usrName) {
+ // TODO: Move fix to start routine.
+ this.usrName = IgfsUtils.fixUserName(usrName);
}
// /**
@@ -580,17 +567,17 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
String user = IgfsUserContext.currentUser();
if (F.isEmpty(user))
- user = dfltUserName; // default is never empty.
+ user = usrName; // default is never empty.
assert !F.isEmpty(user);
- if (F.eq(user, dfltUserName))
+ if (F.eq(user, usrName))
return dfltFs; // optimization
//assert fsFactory.uri() != null : "uri!";
try {
- return fsFactory.get(user);
+ return fsFactory.create(user);
}
catch (IOException ioe) {
throw new IgniteException(ioe);
@@ -605,7 +592,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Override public void start() {
// #start() should not ever be invoked if these properties are not set:
A.ensure(fsFactory != null, "factory");
- A.ensure(dfltUserName != null, "dfltUserName");
+ A.ensure(usrName != null, "dfltUserName");
// Avoid
if (started.compareAndSet(false, true)) {
@@ -618,7 +605,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
// File system creation for the default user name.
// The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field:
//this.dfltFs = secProvider.createFileSystem(dfltUserName);
- this.dfltFs = fsFactory.get(dfltUserName);
+ this.dfltFs = fsFactory.create(usrName);
assert dfltFs != null;
}
@@ -633,6 +620,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
// return fsFactory;
// }
+
+
@Override public void stop() throws IgniteException {
close();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java
deleted file mode 100644
index ba4bfdd..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/DefaultHadoopFileSystemFactory.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package org.apache.ignite.hadoop.fs.v1;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
-import org.apache.ignite.internal.processors.igfs.IgfsPaths;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
-
-import static org.apache.ignite.internal.util.lang.GridFunc.nullifyEmpty;
-
-/**
- * The class is to be instantiated as a Spring beans, so it must have public zero-arg constructor.
- * The class is serializable as it will be transferred over the network as a part of {@link IgfsPaths} object.
- */
-public class DefaultHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable, LifecycleAware {
- /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
- private final transient HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
- new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
- @Override public FileSystem createValue(String key) {
- try {
- assert !F.isEmpty(key);
-
- return createFileSystem(key);
- }
- catch (IOException ioe) {
- throw new IgniteException(ioe);
- }
- }
- }
- );
-
- /** Configuration of the secondary filesystem, never null. */
- protected transient Configuration cfg;
-
- /** */
- protected transient URI uri;
-
- /** */
- protected String uriStr;
-
- /** */
- protected List<String> cfgPathStr;
-
- int getCount = 0;
-
- /**
- *
- */
- public DefaultHadoopFileSystemFactory() {
- //
-
-
-
- }
-
- @Override public FileSystem get(String userName) throws IOException {
- A.ensure(cfg != null, "cfg");
-
- if (getCount == 0)
- assert fileSysLazyMap.size() == 0;
-
- getCount++;
-
- return fileSysLazyMap.getOrCreate(userName);
- }
-
- /**
- * Uri setter.
- * @param uriStr
- */
- public void setUri(String uriStr) {
- this.uriStr = uriStr;
- }
-
- /**
- * Configuration(s) setter, to be invoked from Spring config.
- * @param cfgPaths
- */
- public void setConfigPaths(List<String> cfgPaths) {
- this.cfgPathStr = (List)nullifyEmpty(cfgPaths);
- }
-
- /**
- * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs.
- * @throws IOException
- */
- protected FileSystem createFileSystem(String userName) throws IOException {
- userName = IgfsUtils.fixUserName(nullifyEmpty(userName));
-
- assert cfg != null;
-
- final FileSystem fileSys;
-
- try {
- fileSys = FileSystem.get(uri, cfg, userName);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IOException("Failed to create file system due to interrupt.", e);
- }
-
- return fileSys;
- }
-
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, uriStr);
-
- U.writeCollection(out, cfgPathStr);
- }
-
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- uriStr = U.readString(in);
-
- cfgPathStr = new ArrayList(U.readCollection(in));
- }
-
- @Override public void start() throws IgniteException {
- cfg = HadoopUtils.safeCreateConfiguration();
-
- if (cfgPathStr != null) {
- for (String confPath : cfgPathStr) {
- confPath = nullifyEmpty(confPath);
-
- if (confPath != null) {
- URL url = U.resolveIgniteUrl(confPath);
-
- if (url == null) {
- // If secConfPath is given, it should be resolvable:
- throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " +
-
- "(ensure that it exists locally and you have read access to it): " + confPath);
- }
-
- cfg.addResource(url);
- }
- }
- }
-
- // if secondary fs URI is not given explicitly, try to get it from the configuration:
- if (uriStr == null)
- uri = FileSystem.getDefaultUri(cfg);
- else {
- try {
- uri = new URI(uriStr);
- }
- catch (URISyntaxException use) {
- throw new IgniteException("Failed to resolve secondary file system URI: " + uriStr);
- }
- }
-
- assert uriStr != null;
-
- // Disable caching:
- String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme());
-
- cfg.setBoolean(prop, true);
- }
-
- @Override public void stop() throws IgniteException {
- try {
- fileSysLazyMap.close();
- }
- catch (IgniteCheckedException ice) {
- throw new IgniteException(ice);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java
deleted file mode 100644
index c1c7b9d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/HadoopFileSystemFactory.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.apache.ignite.hadoop.fs.v1;
-
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.hadoop.fs.FileSystem;
-
-/**
- * This factory is {@link Serializable} because it should be transferable over the network.
- */
-public interface HadoopFileSystemFactory extends Serializable {
- /**
- * Gets the file system, possibly creating it or taking a cached instance.
- * All the other data needed for the file system creation are expected to be contained
- * in this object instance.
- *
- * @param userName The user name
- * @return The file system.
- * @throws IOException On error.
- */
- public FileSystem get(String userName) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 355892e..9463412 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
@@ -344,7 +345,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
((LifecycleAware) factory).start();
try {
- secondaryFs = factory.get(user);
+ secondaryFs = factory.create(user);
secondaryUri = secondaryFs.getUri();
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index d0326ea..1e7ac7f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -58,8 +58,7 @@ public class SecondaryFileSystemProvider {
* @throws IOException
*/
public SecondaryFileSystemProvider(@Nullable String secUri, @Nullable String secConfPath) throws IOException {
- secUri = nullifyEmpty(secUri);
- confPath = nullifyEmpty(secConfPath);
+ confPath = secConfPath;
if (confPath != null) {
URL url = U.resolveIgniteUrl(confPath);
@@ -96,7 +95,7 @@ public class SecondaryFileSystemProvider {
* @throws IOException
*/
public FileSystem createFileSystem(String userName) throws IOException {
- userName = IgfsUtils.fixUserName(nullifyEmpty(userName));
+ userName = IgfsUtils.fixUserName(userName);
final FileSystem fileSys;
http://git-wip-us.apache.org/repos/asf/ignite/blob/13f8170d/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index 1ce0492..310c390 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -62,7 +62,7 @@ import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.hadoop.fs.v1.DefaultHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEx;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsIpcIo;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutProc;
@@ -383,15 +383,15 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
cfg.setDefaultMode(mode);
if (mode != PRIMARY) {
- DefaultHadoopFileSystemFactory fac = new DefaultHadoopFileSystemFactory();
+ CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
fac.setUri(SECONDARY_URI);
fac.setConfigPaths(Collections.singletonList(SECONDARY_CFG_PATH));
IgniteHadoopIgfsSecondaryFileSystem sec = new IgniteHadoopIgfsSecondaryFileSystem();
- sec.setFsFactory(fac);
- sec.setDfltUserName(SECONDARY_FS_USER);
+ sec.setFileSystemFactory(fac);
+ sec.setUserName(SECONDARY_FS_USER);
// NB: start() will be invoked upon IgfsImpl init.
cfg.setSecondaryFileSystem(sec);