You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/05/28 16:40:57 UTC

incubator-ignite git commit: # IGNITE-218: Review.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-218-hdfs-only f23e02fad -> 2a472e161


# IGNITE-218: Review.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2a472e16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2a472e16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2a472e16

Branch: refs/heads/ignite-218-hdfs-only
Commit: 2a472e16144c243d94c69ce022884889bd87f464
Parents: f23e02f
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu May 28 17:41:18 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu May 28 17:41:18 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/igfs/IgfsUserContext.java |  3 ++
 .../shmem/IpcSharedMemoryServerEndpoint.java    |  2 +-
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java |  5 ++-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  1 +
 .../hadoop/fs/HadoopLazyConcurrentMap.java      | 38 ++++++++++----------
 .../hadoop/v2/HadoopV2TaskContext.java          |  1 +
 6 files changed, 28 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
index 567fae5..1a91677 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.igfs;
 
+// TODO: Remove.
 import org.apache.ignite.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -46,6 +47,7 @@ public abstract class IgfsUserContext {
         if (F.isEmpty(user))
             // use NPE to ensure that #doAs() caller will not treat this exception
             // as the one thrown from the closure:
+            // TODO: use IllegalArgument or IgniteException
             throw new NullPointerException("Failed to use null or empty user name.");
 
         final String ctxUser = userStackThreadLocal.get();
@@ -95,6 +97,7 @@ public abstract class IgfsUserContext {
         if (F.isEmpty(user))
             // use NPE to ensure that #doAs() caller will not treat this exception
             // as the one thrown from the closure:
+            // TODO: use IllegalArgument or IgniteException
             throw new NullPointerException("Failed to use null or empty user name.");
 
         final String ctxUser = userStackThreadLocal.get();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
index b2bc4cf..86a0886 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
@@ -592,7 +592,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint {
                 if (log.isDebugEnabled())
                     log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath());
             }
-            catch (InterruptedIOException | FileLockInterruptionException ignored) {
+            catch (InterruptedIOException ignored) {
                 Thread.currentThread().interrupt();
             }
             catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 024cc68..9ed92ad 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -108,8 +108,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @throws IgniteCheckedException In case of error.
      */
     public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath,
-        @Nullable String userName)
-            throws IgniteCheckedException {
+        @Nullable String userName) throws IgniteCheckedException {
         // Treat empty uri and userName arguments as nulls to improve configuration usability:
         if (F.isEmpty(uri))
             uri = null;
@@ -144,7 +143,6 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
             props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
 
         props.put(SECONDARY_FS_URI, uri);
-
         props.put(SECONDARY_FS_USER_NAME, dfltUserName);
     }
 
@@ -475,6 +473,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
 
         List<IOException> ioExs = new LinkedList<>();
 
+        // TODO: Close is not thread-safe.
         Set<String> keySet = map.keySet();
 
         for (String key: keySet) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 46c9ba4..c0a9ade 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -190,6 +190,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
         if (user == null) {
             UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+
             if (currUgi != null)
                 user = currUgi.getShortUserName();
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index cdafdde..cc36ea0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -19,12 +19,15 @@ package org.apache.ignite.internal.processors.hadoop.fs;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
+// TODO: Remove unused
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
 
 /**
  * Maps values by keys.
@@ -56,6 +59,7 @@ public class HadoopLazyConcurrentMap<K, V> {
      * @throws IgniteException on error
      */
     public V getOrCreate(K k) {
+        // TODO: Do "get" first.
         final ValueWrapper wNew = new ValueWrapper(k);
 
         ValueWrapper w = map.putIfAbsent(k, wNew);
@@ -74,7 +78,7 @@ public class HadoopLazyConcurrentMap<K, V> {
 
             return v;
         }
-        catch (IgniteInterruptedCheckedException ie) {
+        catch (IgniteCheckedException ie) {
             throw new IgniteException(ie);
         }
     }
@@ -93,7 +97,7 @@ public class HadoopLazyConcurrentMap<K, V> {
         try {
             return w.getValue();
         }
-        catch (IgniteInterruptedCheckedException ie) {
+        catch (IgniteCheckedException ie) {
             throw new IgniteException(ie);
         }
     }
@@ -115,20 +119,16 @@ public class HadoopLazyConcurrentMap<K, V> {
         map.clear();
     }
 
-
     /**
      * Helper class that drives the lazy value creation.
      */
     private class ValueWrapper {
-        /** Value creation latch */
-        private final CountDownLatch vlueCrtLatch = new CountDownLatch(1);
+        /** Future. */
+        private final GridFutureAdapter<V> fut = new GridFutureAdapter<>();
 
         /** the key */
         private final K key;
 
-        /** the value */
-        private V v;
-
         /**
          * Creates new wrapper.
          */
@@ -140,14 +140,17 @@ public class HadoopLazyConcurrentMap<K, V> {
          * Initializes the value using the factory.
          */
         private void init() {
-            final V v0 = factory.createValue(key);
-
-            if (v0 == null)
-                throw new IgniteException("Failed to create non-null value. [key=" + key + ']');
+            try {
+                final V v0 = factory.createValue(key);
 
-            v = v0;
+                if (v0 == null)
+                    throw new IgniteException("Failed to create non-null value. [key=" + key + ']');
 
-            vlueCrtLatch.countDown();
+                fut.onDone(v0);
+            }
+            catch (Throwable e) {
+                fut.onDone(e);
+            }
         }
 
         /**
@@ -155,10 +158,8 @@ public class HadoopLazyConcurrentMap<K, V> {
          * @return the value
          * @throws IgniteInterruptedCheckedException if interrupted during wait.
          */
-        @Nullable V getValue() throws IgniteInterruptedCheckedException {
-            U.await(vlueCrtLatch);
-
-            return v;
+        @Nullable V getValue() throws IgniteCheckedException {
+            return fut.get();
         }
     }
 
@@ -170,6 +171,7 @@ public class HadoopLazyConcurrentMap<K, V> {
     public interface ValueFactory <K, V> {
         /**
          * Creates the new value. Must never return null.
+         *
          * @param key the key to create value for
          * @return the value.
          * @throws IgniteException on failure.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 9f5220d..2270caa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -241,6 +241,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         try {
             FileSystem fs = FileSystem.get(jobConf());
 
+            // TODO: Remove
             //HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
 
             LocalFileSystem locFs = FileSystem.getLocal(jobConf());