You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/03/02 15:03:55 UTC
[06/50] [abbrv] incubator-ignite git commit: # IGNITE-348: Applied
patch from Ivan V..
# IGNITE-348: Applied patch from Ivan V..
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/23bee413
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/23bee413
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/23bee413
Branch: refs/heads/ignite-sql-tests
Commit: 23bee413c6e4f382036ab63433e1a38927a8347f
Parents: 4e7463d
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Feb 27 13:30:53 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Feb 27 13:30:53 2015 +0300
----------------------------------------------------------------------
config/hadoop/default-config.xml | 12 +
.../hadoop/IgfsHadoopFileSystemWrapper.java | 412 ++++++++++++++++++
.../igfs/hadoop/v1/IgfsHadoopFileSystem.java | 1 +
.../igfs/hadoop/v2/IgfsHadoopFileSystem.java | 1 +
.../igfs/hadoop/IgfsHadoopFSProperties.java | 10 +-
.../hadoop/IgfsHadoopFileSystemWrapper.java | 413 -------------------
.../internal/igfs/hadoop/IgfsHadoopReader.java | 2 +-
.../apache/ignite/igfs/IgfsEventsTestSuite.java | 2 +-
.../IgfsHadoop20FileSystemAbstractSelfTest.java | 2 +-
.../igfs/IgfsHadoopDualAbstractSelfTest.java | 2 +-
.../IgfsHadoopFileSystemAbstractSelfTest.java | 1 +
...fsHadoopFileSystemSecondaryModeSelfTest.java | 2 +-
12 files changed, 437 insertions(+), 423 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/config/hadoop/default-config.xml
----------------------------------------------------------------------
diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml
index 5fafad8..a264749 100644
--- a/config/hadoop/default-config.xml
+++ b/config/hadoop/default-config.xml
@@ -129,6 +129,18 @@
<entry key="port" value="10500"/>
</map>
</property>
+
+ <!-- Example secondary file system configuration (IGFS configured over Hadoop HDFS): -->
+ <!--
+ <property name="defaultMode" value="PROXY"/>
+
+ <property name="secondaryFileSystem">
+ <bean class="org.apache.ignite.igfs.hadoop.IgfsHadoopFileSystemWrapper">
+ <constructor-arg name="uri" value="hdfs://1.2.3.4:9000"/>
+ <constructor-arg name="cfgPath" value="/opt/hadoop-server/etc/hadoop/core-site.xml"/>
+ </bean>
+ </property>
+ -->
</bean>
</list>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
new file mode 100644
index 0000000..29dfde5
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.ipc.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link org.apache.ignite.igfs.Igfs}.
+ */
+public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable {
+ /** Property name for path to Hadoop configuration. */
+ public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
+
+ /** Property name for URI of file system. */
+ public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
+
+ /** Hadoop file system. */
+ private final FileSystem fileSys;
+
+ /** Properties of file system */
+ private final Map<String, String> props = new HashMap<>();
+
+ /**
+ * Constructor.
+ *
+ * @param uri URI of file system.
+ * @param cfgPath Additional path to Hadoop configuration.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public IgfsHadoopFileSystemWrapper(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException {
+ Configuration cfg = new Configuration();
+
+ if (cfgPath != null)
+ cfg.addResource(U.resolveIgniteUrl(cfgPath));
+
+ try {
+ fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg);
+ }
+ catch (IOException | URISyntaxException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ uri = fileSys.getUri().toString();
+
+ if (!uri.endsWith("/"))
+ uri += "/";
+
+ props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+ props.put(SECONDARY_FS_URI, uri);
+ }
+
+ /**
+ * Convert IGFS path into Hadoop path.
+ *
+ * @param path IGFS path.
+ * @return Hadoop path.
+ */
+ private Path convert(IgfsPath path) {
+ URI uri = fileSys.getUri();
+
+ return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
+ }
+
+ /**
+ * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
+ *
+ * @param e Exception to check.
+ * @param detailMsg Detailed error message.
+ * @return Appropriate exception.
+ */
+ private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
+ boolean wrongVer = X.hasCause(e, RemoteException.class) ||
+ (e.getMessage() != null && e.getMessage().contains("Failed on local"));
+
+ IgfsException igfsErr = !wrongVer ? cast(detailMsg, e) :
+ new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " +
+ "version.", e);
+
+ return igfsErr;
+ }
+
+ /**
+ * Cast IO exception to IGFS exception.
+ *
+ * @param e IO exception.
+ * @return IGFS exception.
+ */
+ public static IgfsException cast(String msg, IOException e) {
+ if (e instanceof FileNotFoundException)
+ return new IgfsFileNotFoundException(e);
+ else if (e instanceof ParentNotDirectoryException)
+ return new IgfsParentNotDirectoryException(msg, e);
+ else if (e instanceof PathIsNotEmptyDirectoryException)
+ return new IgfsDirectoryNotEmptyException(e);
+ else if (e instanceof PathExistsException)
+ return new IgfsPathAlreadyExistsException(msg, e);
+ else
+ return new IgfsException(msg, e);
+ }
+
+ /**
+ * Convert Hadoop FileStatus properties to map.
+ *
+ * @param status File status.
+ * @return IGFS attributes.
+ */
+ private static Map<String, String> properties(FileStatus status) {
+ FsPermission perm = status.getPermission();
+
+ if (perm == null)
+ perm = FsPermission.getDefault();
+
+ return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(),
+ PROP_GROUP_NAME, status.getGroup());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean exists(IgfsPath path) {
+ try {
+ return fileSys.exists(convert(path));
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
+ IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props);
+
+ try {
+ if (props0.userName() != null || props0.groupName() != null)
+ fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
+
+ if (props0.permission() != null)
+ fileSys.setPermission(convert(path), props0.permission());
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
+ }
+
+ //Result is not used in case of secondary FS.
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rename(IgfsPath src, IgfsPath dest) {
+ // Delegate to the secondary file system.
+ try {
+ if (!fileSys.rename(convert(src), convert(dest)))
+ throw new IgfsException("Failed to rename (secondary file system returned false) " +
+ "[src=" + src + ", dest=" + dest + ']');
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']');
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(IgfsPath path, boolean recursive) {
+ try {
+ return fileSys.delete(convert(path), recursive);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mkdirs(IgfsPath path) {
+ try {
+ if (!fileSys.mkdirs(convert(path)))
+ throw new IgniteException("Failed to make directories [path=" + path + "]");
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
+ try {
+ if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission()))
+ throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
+ try {
+ FileStatus[] statuses = fileSys.listStatus(convert(path));
+
+ if (statuses == null)
+ throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+
+ Collection<IgfsPath> res = new ArrayList<>(statuses.length);
+
+ for (FileStatus status : statuses)
+ res.add(new IgfsPath(path, status.getPath().getName()));
+
+ return res;
+ }
+ catch (FileNotFoundException ignored) {
+ throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
+ try {
+ FileStatus[] statuses = fileSys.listStatus(convert(path));
+
+ if (statuses == null)
+ throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+
+ Collection<IgfsFile> res = new ArrayList<>(statuses.length);
+
+ for (FileStatus status : statuses) {
+ IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status)) :
+ new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null, false,
+ properties(status));
+
+ res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()), fsInfo, 1));
+ }
+
+ return res;
+ }
+ catch (FileNotFoundException ignored) {
+ throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsReader open(IgfsPath path, int bufSize) {
+ return new IgfsHadoopReader(fileSys, convert(path), bufSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public OutputStream create(IgfsPath path, boolean overwrite) {
+ try {
+ return fileSys.create(convert(path), overwrite);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
+ long blockSize, @Nullable Map<String, String> props) {
+ IgfsHadoopFSProperties props0 =
+ new IgfsHadoopFSProperties(props != null ? props : Collections.<String, String>emptyMap());
+
+ try {
+ return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize,
+ null);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
+ ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication +
+ ", blockSize=" + blockSize + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
+ @Nullable Map<String, String> props) {
+ try {
+ return fileSys.append(convert(path), bufSize);
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsFile info(final IgfsPath path) {
+ try {
+ final FileStatus status = fileSys.getFileStatus(convert(path));
+
+ if (status == null)
+ return null;
+
+ final Map<String, String> props = properties(status);
+
+ return new IgfsFile() {
+ @Override public IgfsPath path() {
+ return path;
+ }
+
+ @Override public boolean isFile() {
+ return status.isFile();
+ }
+
+ @Override public boolean isDirectory() {
+ return status.isDirectory();
+ }
+
+ @Override public int blockSize() {
+ return (int)status.getBlockSize();
+ }
+
+ @Override public long groupBlockSize() {
+ return status.getBlockSize();
+ }
+
+ @Override public long accessTime() {
+ return status.getAccessTime();
+ }
+
+ @Override public long modificationTime() {
+ return status.getModificationTime();
+ }
+
+ @Override public String property(String name) throws IllegalArgumentException {
+ String val = props.get(name);
+
+ if (val == null)
+ throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
+
+ return val;
+ }
+
+ @Nullable @Override public String property(String name, @Nullable String dfltVal) {
+ String val = props.get(name);
+
+ return val == null ? dfltVal : val;
+ }
+
+ @Override public long length() {
+ return status.getLen();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, String> properties() {
+ return props;
+ }
+ };
+
+ }
+ catch (FileNotFoundException ignore) {
+ return null;
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long usedSpaceSize() {
+ try {
+ return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed();
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<String, String> properties() {
+ return props;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ try {
+ fileSys.close();
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
index 8762d83..2f8b013 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.hadoop.*;
import org.apache.ignite.internal.igfs.common.*;
import org.apache.ignite.internal.igfs.hadoop.*;
import org.apache.ignite.internal.processors.igfs.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
index a38178c..ff8c50c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.hadoop.*;
import org.apache.ignite.internal.igfs.common.*;
import org.apache.ignite.internal.igfs.hadoop.*;
import org.apache.ignite.internal.processors.igfs.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
index e0ea1b6..c9d1322 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
@@ -27,7 +27,7 @@ import static org.apache.ignite.IgniteFs.*;
/**
* Hadoop file system properties.
*/
-class IgfsHadoopFSProperties {
+public class IgfsHadoopFSProperties {
/** Username. */
private String usrName;
@@ -43,7 +43,7 @@ class IgfsHadoopFSProperties {
* @param props Properties.
* @throws IgniteException In case of error.
*/
- IgfsHadoopFSProperties(Map<String, String> props) throws IgniteException {
+ public IgfsHadoopFSProperties(Map<String, String> props) throws IgniteException {
usrName = props.get(PROP_USER_NAME);
grpName = props.get(PROP_GROUP_NAME);
@@ -64,7 +64,7 @@ class IgfsHadoopFSProperties {
*
* @return User name.
*/
- String userName() {
+ public String userName() {
return usrName;
}
@@ -73,7 +73,7 @@ class IgfsHadoopFSProperties {
*
* @return Group name.
*/
- String groupName() {
+ public String groupName() {
return grpName;
}
@@ -82,7 +82,7 @@ class IgfsHadoopFSProperties {
*
* @return Permission.
*/
- FsPermission permission() {
+ public FsPermission permission() {
return perm;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
deleted file mode 100644
index 9935466..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link org.apache.ignite.igfs.Igfs}.
- */
-public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable {
- /** Property name for path to Hadoop configuration. */
- public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
-
- /** Property name for URI of file system. */
- public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
-
- /** Hadoop file system. */
- private final FileSystem fileSys;
-
- /** Properties of file system */
- private final Map<String, String> props = new HashMap<>();
-
- /**
- * Constructor.
- *
- * @param uri URI of file system.
- * @param cfgPath Additional path to Hadoop configuration.
- * @throws IgniteCheckedException In case of error.
- */
- public IgfsHadoopFileSystemWrapper(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException {
- Configuration cfg = new Configuration();
-
- if (cfgPath != null)
- cfg.addResource(U.resolveIgniteUrl(cfgPath));
-
- try {
- fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg);
- }
- catch (IOException | URISyntaxException e) {
- throw new IgniteCheckedException(e);
- }
-
- uri = fileSys.getUri().toString();
-
- if (!uri.endsWith("/"))
- uri += "/";
-
- props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
- props.put(SECONDARY_FS_URI, uri);
- }
-
- /**
- * Convert IGFS path into Hadoop path.
- *
- * @param path IGFS path.
- * @return Hadoop path.
- */
- private Path convert(IgfsPath path) {
- URI uri = fileSys.getUri();
-
- return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
- }
-
- /**
- * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
- *
- * @param e Exception to check.
- * @param detailMsg Detailed error message.
- * @return Appropriate exception.
- */
- private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
- boolean wrongVer = X.hasCause(e, RemoteException.class) ||
- (e.getMessage() != null && e.getMessage().contains("Failed on local"));
-
- IgfsException igfsErr = !wrongVer ? cast(detailMsg, e) :
- new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " +
- "version.", e);
-
-
-
- return igfsErr;
- }
-
- /**
- * Cast IO exception to IGFS exception.
- *
- * @param e IO exception.
- * @return IGFS exception.
- */
- public static IgfsException cast(String msg, IOException e) {
- if (e instanceof FileNotFoundException)
- return new IgfsFileNotFoundException(e);
- else if (e instanceof ParentNotDirectoryException)
- return new IgfsParentNotDirectoryException(msg, e);
- else if (e instanceof PathIsNotEmptyDirectoryException)
- return new IgfsDirectoryNotEmptyException(e);
- else if (e instanceof PathExistsException)
- return new IgfsPathAlreadyExistsException(msg, e);
- else
- return new IgfsException(msg, e);
- }
-
- /**
- * Convert Hadoop FileStatus properties to map.
- *
- * @param status File status.
- * @return IGFS attributes.
- */
- private static Map<String, String> properties(FileStatus status) {
- FsPermission perm = status.getPermission();
-
- if (perm == null)
- perm = FsPermission.getDefault();
-
- return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(),
- PROP_GROUP_NAME, status.getGroup());
- }
-
- /** {@inheritDoc} */
- @Override public boolean exists(IgfsPath path) {
- try {
- return fileSys.exists(convert(path));
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
- IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props);
-
- try {
- if (props0.userName() != null || props0.groupName() != null)
- fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
-
- if (props0.permission() != null)
- fileSys.setPermission(convert(path), props0.permission());
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
- }
-
- //Result is not used in case of secondary FS.
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void rename(IgfsPath src, IgfsPath dest) {
- // Delegate to the secondary file system.
- try {
- if (!fileSys.rename(convert(src), convert(dest)))
- throw new IgfsException("Failed to rename (secondary file system returned false) " +
- "[src=" + src + ", dest=" + dest + ']');
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']');
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean delete(IgfsPath path, boolean recursive) {
- try {
- return fileSys.delete(convert(path), recursive);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public void mkdirs(IgfsPath path) {
- try {
- if (!fileSys.mkdirs(convert(path)))
- throw new IgniteException("Failed to make directories [path=" + path + "]");
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
- try {
- if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission()))
- throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
- try {
- FileStatus[] statuses = fileSys.listStatus(convert(path));
-
- if (statuses == null)
- throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
-
- Collection<IgfsPath> res = new ArrayList<>(statuses.length);
-
- for (FileStatus status : statuses)
- res.add(new IgfsPath(path, status.getPath().getName()));
-
- return res;
- }
- catch (FileNotFoundException ignored) {
- throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
- try {
- FileStatus[] statuses = fileSys.listStatus(convert(path));
-
- if (statuses == null)
- throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
-
- Collection<IgfsFile> res = new ArrayList<>(statuses.length);
-
- for (FileStatus status : statuses) {
- IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status)) :
- new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null, false,
- properties(status));
-
- res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()), fsInfo, 1));
- }
-
- return res;
- }
- catch (FileNotFoundException ignored) {
- throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsReader open(IgfsPath path, int bufSize) {
- return new IgfsHadoopReader(fileSys, convert(path), bufSize);
- }
-
- /** {@inheritDoc} */
- @Override public OutputStream create(IgfsPath path, boolean overwrite) {
- try {
- return fileSys.create(convert(path), overwrite);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
- long blockSize, @Nullable Map<String, String> props) {
- IgfsHadoopFSProperties props0 =
- new IgfsHadoopFSProperties(props != null ? props : Collections.<String, String>emptyMap());
-
- try {
- return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize,
- null);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
- ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication +
- ", blockSize=" + blockSize + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
- @Nullable Map<String, String> props) {
- try {
- return fileSys.append(convert(path), bufSize);
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile info(final IgfsPath path) {
- try {
- final FileStatus status = fileSys.getFileStatus(convert(path));
-
- if (status == null)
- return null;
-
- final Map<String, String> props = properties(status);
-
- return new IgfsFile() {
- @Override public IgfsPath path() {
- return path;
- }
-
- @Override public boolean isFile() {
- return status.isFile();
- }
-
- @Override public boolean isDirectory() {
- return status.isDirectory();
- }
-
- @Override public int blockSize() {
- return (int)status.getBlockSize();
- }
-
- @Override public long groupBlockSize() {
- return status.getBlockSize();
- }
-
- @Override public long accessTime() {
- return status.getAccessTime();
- }
-
- @Override public long modificationTime() {
- return status.getModificationTime();
- }
-
- @Override public String property(String name) throws IllegalArgumentException {
- String val = props.get(name);
-
- if (val == null)
- throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
-
- return val;
- }
-
- @Nullable @Override public String property(String name, @Nullable String dfltVal) {
- String val = props.get(name);
-
- return val == null ? dfltVal : val;
- }
-
- @Override public long length() {
- return status.getLen();
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, String> properties() {
- return props;
- }
- };
-
- }
- catch (FileNotFoundException ignore) {
- return null;
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public long usedSpaceSize() {
- try {
- return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed();
- }
- catch (IOException e) {
- throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
- }
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Map<String, String> properties() {
- return props;
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- try {
- fileSys.close();
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java
index 7234269..3ab3acc 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java
@@ -56,7 +56,7 @@ public class IgfsHadoopReader implements IgfsReader {
* @param path Path to the file to open.
* @param bufSize Buffer size.
*/
- IgfsHadoopReader(FileSystem fs, Path path, int bufSize) {
+ public IgfsHadoopReader(FileSystem fs, Path path, int bufSize) {
assert fs != null;
assert path != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
index 05a7b1d..29696bf 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
@@ -20,7 +20,7 @@ package org.apache.ignite.igfs;
import junit.framework.*;
import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.igfs.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.util.ipc.shmem.*;
import org.apache.ignite.internal.util.typedef.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
index 207bc79..9f9a6d8 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.permission.*;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.igfs.hadoop.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
index 22c144f..a54e264 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.igfs.hadoop.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
index 606eb48..7359fdf 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.permission.*;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.hadoop.*;
import org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem;
import org.apache.ignite.internal.igfs.hadoop.*;
import org.apache.ignite.internal.processors.igfs.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
index 2e22d93..b88816a 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
@@ -21,8 +21,8 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.hadoop.*;
import org.apache.ignite.igfs.hadoop.v1.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;