You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/28 18:30:07 UTC

[1/4] incubator-ignite git commit: # save.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-218-hdfs-only 2a472e161 -> 3fe658263


# save.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/73cb2bc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/73cb2bc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/73cb2bc0

Branch: refs/heads/ignite-218-hdfs-only
Commit: 73cb2bc0b7b22a68a7136095b48b33d48053f2e3
Parents: 2a472e1
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu May 28 18:37:03 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu May 28 18:37:03 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/igfs/IgfsUserContext.java | 21 ++++++--------------
 1 file changed, 6 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/73cb2bc0/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
index 1a91677..1242982 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -17,10 +17,7 @@
 
 package org.apache.ignite.igfs;
 
-// TODO: Remove.
-import org.apache.ignite.*;
 import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
@@ -41,14 +38,11 @@ public abstract class IgfsUserContext {
      * @param clo the closure to execute
      * @param <T> The type of closure result.
      * @return the result of closure execution.
-     * @throws NullPointerException if user name is null or empty String or if the closure is null.
+     * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
      */
     public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
         if (F.isEmpty(user))
-            // use NPE to ensure that #doAs() caller will not treat this exception
-            // as the one thrown from the closure:
-            // TODO: use IllegalArgument or IgniteException
-            throw new NullPointerException("Failed to use null or empty user name.");
+            throw new IllegalArgumentException("Failed to use null or empty user name.");
 
         final String ctxUser = userStackThreadLocal.get();
 
@@ -74,7 +68,7 @@ public abstract class IgfsUserContext {
      *  public Foo myOperation() throws MyCheckedException1, MyCheckedException2 {
      *      try {
      *          return IgfsUserContext.doAs(user, new Callable<Foo>() {
-     *              @Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
+     *              &#64;Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
      *                  return makeSomeFoo(); // do the job
      *              }
      *          });
@@ -91,14 +85,11 @@ public abstract class IgfsUserContext {
      * @param clbl the Callable to execute
      * @param <T> The type of callable result.
      * @return the result of closure execution.
-     * @throws NullPointerException if user name is null or empty String or if the closure is null.
+     * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
      */
     public static <T> T doAs(String user, final Callable<T> clbl) throws Exception {
         if (F.isEmpty(user))
-            // use NPE to ensure that #doAs() caller will not treat this exception
-            // as the one thrown from the closure:
-            // TODO: use IllegalArgument or IgniteException
-            throw new NullPointerException("Failed to use null or empty user name.");
+            throw new IllegalArgumentException("Failed to use null or empty user name.");
 
         final String ctxUser = userStackThreadLocal.get();
 
@@ -120,7 +111,7 @@ public abstract class IgfsUserContext {
      * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will return null.
      * Otherwise it will return the user name set in the most lower {@link #doAs(String, IgniteOutClosure)} call
      * on the call stack.
-     * @return the current user, may be null.
+     * @return The current user, may be null.
      */
     @Nullable public static String currentUser() {
         return userStackThreadLocal.get();


[3/4] incubator-ignite git commit: Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-218-hdfs-only

Posted by sb...@apache.org.
Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-218-hdfs-only


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c03cc586
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c03cc586
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c03cc586

Branch: refs/heads/ignite-218-hdfs-only
Commit: c03cc586ee18f972610b8c9406234e0f05867ba3
Parents: 73cb2bc 98e392c
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu May 28 18:45:07 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu May 28 18:45:07 2015 +0300

----------------------------------------------------------------------
 .../internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------



[4/4] incubator-ignite git commit: [IGNITE-218]: corrections after review.

Posted by sb...@apache.org.
[IGNITE-218]: corrections after review.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3fe65826
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3fe65826
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3fe65826

Branch: refs/heads/ignite-218-hdfs-only
Commit: 3fe6582638bdaecd9f8b92c5c437b456d8bec411
Parents: c03cc58
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu May 28 19:29:16 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu May 28 19:29:16 2015 +0300

----------------------------------------------------------------------
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 28 +-----
 .../hadoop/fs/HadoopLazyConcurrentMap.java      | 97 ++++++++++++++------
 .../hadoop/v2/HadoopV2TaskContext.java          |  5 +-
 3 files changed, 69 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 9ed92ad..51878da 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -466,33 +466,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteCheckedException {
-        final HadoopLazyConcurrentMap<String,FileSystem> map = fileSysLazyMap;
-
-        if (map == null)
-            return; // already cleared.
-
-        List<IOException> ioExs = new LinkedList<>();
-
-        // TODO: Close is not thread-safe.
-        Set<String> keySet = map.keySet();
-
-        for (String key: keySet) {
-            FileSystem fs = map.get(key);
-
-            if (fs != null) {
-                try {
-                    fs.close();
-                }
-                catch (IOException ioe) {
-                    ioExs.add(ioe);
-                }
-            }
-        }
-
-        map.clear();
-
-        if (!ioExs.isEmpty())
-            throw new IgniteCheckedException(ioExs.get(0));
+        fileSysLazyMap.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index cc36ea0..0fe9871 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -18,31 +18,34 @@
 package org.apache.ignite.internal.processors.hadoop.fs;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.future.*;
-// TODO: Remove unused
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
 
 /**
  * Maps values by keys.
  * Values are created lazily using {@link ValueFactory}.
- * Currently only {@link #clear()} method can remove a value.
  *
  * Despite of the name, does not depend on any Hadoop classes.
  */
-public class HadoopLazyConcurrentMap<K, V> {
+public class HadoopLazyConcurrentMap<K, V extends Closeable> {
     /** The map storing the actual values. */
     private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
 
     /** The factory passed in by the client. Will be used for lazy value creation. */
     private final ValueFactory<K, V> factory;
 
+    /** Lock used to close the objects. */
+    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
+    /** Flag indicating that this map is closed and cleared. */
+    private boolean closed;
+
     /**
      * Constructor.
      * @param factory the factory to create new values lazily.
@@ -59,16 +62,30 @@ public class HadoopLazyConcurrentMap<K, V> {
      * @throws IgniteException on error
      */
     public V getOrCreate(K k) {
-        // TODO: Do "get" first.
-        final ValueWrapper wNew = new ValueWrapper(k);
-
-        ValueWrapper w = map.putIfAbsent(k, wNew);
+        ValueWrapper w = map.get(k);
 
         if (w == null) {
-            // new wrapper 'w' has been put, so init the value:
-            wNew.init();
+            final ValueWrapper wNew = new ValueWrapper(k);
+
+            w = map.putIfAbsent(k, wNew);
+
+            if (w == null) {
+                // new wrapper 'w' has been put, so init the value:
+                closeLock.readLock().lock();
 
-            w = wNew;
+                try {
+                    if (closed)
+                        throw new IllegalStateException("Failed to create value for key [" + k
+                            + "]: the map is already closed.");
+
+                    wNew.init();
+                }
+                finally {
+                    closeLock.readLock().unlock();
+                }
+
+                w = wNew;
+            }
         }
 
         try {
@@ -103,20 +120,40 @@ public class HadoopLazyConcurrentMap<K, V> {
     }
 
     /**
-     * Gets the keySet of this map,
-     * the contract is as per {@link ConcurrentMap#keySet()}
-     * @return the set of keys, never null.
+     * Clears the map and closes all the values.
      */
-    public Set<K> keySet() {
-        return map.keySet();
-    }
+    public void close() throws IgniteCheckedException {
+        closeLock.writeLock().lock();
 
-    /**
-     * Clears the map.
-     * Follows the contract of {@link ConcurrentMap#clear()}
-     */
-    public void clear() {
-        map.clear();
+        try {
+            List<IOException> ioExs = new LinkedList<>();
+
+            Set<K> keySet = map.keySet();
+
+            for (K key: keySet) {
+                ValueWrapper w = map.get(key);
+
+                if (w != null) {
+                    try {
+                        V v = w.getValue();
+
+                        v.close();
+                    }
+                    catch (IOException ioe) {
+                        ioExs.add(ioe);
+                    }
+                }
+            }
+
+            map.clear();
+
+            if (!ioExs.isEmpty())
+                throw new IgniteCheckedException(ioExs.get(0));
+
+            closed = true;
+        } finally {
+            closeLock.writeLock().unlock();
+        }
     }
 
     /**
@@ -154,11 +191,11 @@ public class HadoopLazyConcurrentMap<K, V> {
         }
 
         /**
-         * Blocks until the value is initialized.
-         * @return the value
-         * @throws IgniteInterruptedCheckedException if interrupted during wait.
+         * Gets the available value or blocks until the value is initialized.
+         * @return the value, never null.
+         * @throws IgniteCheckedException on error.
          */
-        @Nullable V getValue() throws IgniteCheckedException {
+        V getValue() throws IgniteCheckedException {
             return fut.get();
         }
     }
@@ -170,7 +207,7 @@ public class HadoopLazyConcurrentMap<K, V> {
      */
     public interface ValueFactory <K, V> {
         /**
-         * Creates the new value. Must never return null.
+         * Creates the new value. Should never return null.
          *
          * @param key the key to create value for
          * @return the value.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 2270caa..dd18c66 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -239,10 +239,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
 
         try {
-            FileSystem fs = FileSystem.get(jobConf());
-
-            // TODO: Remove
-            //HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
+            FileSystem.get(jobConf());
 
             LocalFileSystem locFs = FileSystem.getLocal(jobConf());
 


[2/4] incubator-ignite git commit: [IGNITE-765]: org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.GcWorker#cleanupResources incorrectly handles FileLockInterruptionException

Posted by sb...@apache.org.
[IGNITE-765]: org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.GcWorker#cleanupResources incorrectly handles FileLockInterruptionException


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/98e392cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/98e392cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/98e392cd

Branch: refs/heads/ignite-218-hdfs-only
Commit: 98e392cd79183945c26e9c220787cdbd4f686c26
Parents: b6fc8a9
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu May 28 18:39:45 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu May 28 18:39:45 2015 +0300

----------------------------------------------------------------------
 .../internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98e392cd/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
index 86a0886..5185856 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
@@ -592,7 +592,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint {
                 if (log.isDebugEnabled())
                     log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath());
             }
-            catch (InterruptedIOException ignored) {
+            catch (FileLockInterruptionException ignored) {
                 Thread.currentThread().interrupt();
             }
             catch (IOException e) {