You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/28 18:30:07 UTC
[1/4] incubator-ignite git commit: # save.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-218-hdfs-only 2a472e161 -> 3fe658263
# save.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/73cb2bc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/73cb2bc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/73cb2bc0
Branch: refs/heads/ignite-218-hdfs-only
Commit: 73cb2bc0b7b22a68a7136095b48b33d48053f2e3
Parents: 2a472e1
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu May 28 18:37:03 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu May 28 18:37:03 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/igfs/IgfsUserContext.java | 21 ++++++--------------
1 file changed, 6 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/73cb2bc0/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
index 1a91677..1242982 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -17,10 +17,7 @@
package org.apache.ignite.igfs;
-// TODO: Remove.
-import org.apache.ignite.*;
import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
@@ -41,14 +38,11 @@ public abstract class IgfsUserContext {
* @param clo the closure to execute
* @param <T> The type of closure result.
* @return the result of closure execution.
- * @throws NullPointerException if user name is null or empty String or if the closure is null.
+ * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
*/
public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
if (F.isEmpty(user))
- // use NPE to ensure that #doAs() caller will not treat this exception
- // as the one thrown from the closure:
- // TODO: use IllegalArgument or IgniteException
- throw new NullPointerException("Failed to use null or empty user name.");
+ throw new IllegalArgumentException("Failed to use null or empty user name.");
final String ctxUser = userStackThreadLocal.get();
@@ -74,7 +68,7 @@ public abstract class IgfsUserContext {
* public Foo myOperation() throws MyCheckedException1, MyCheckedException2 {
* try {
* return IgfsUserContext.doAs(user, new Callable<Foo>() {
- * @Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
+ * @Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
* return makeSomeFoo(); // do the job
* }
* });
@@ -91,14 +85,11 @@ public abstract class IgfsUserContext {
* @param clbl the Callable to execute
* @param <T> The type of callable result.
* @return the result of closure execution.
- * @throws NullPointerException if user name is null or empty String or if the closure is null.
+ * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
*/
public static <T> T doAs(String user, final Callable<T> clbl) throws Exception {
if (F.isEmpty(user))
- // use NPE to ensure that #doAs() caller will not treat this exception
- // as the one thrown from the closure:
- // TODO: use IllegalArgument or IgniteException
- throw new NullPointerException("Failed to use null or empty user name.");
+ throw new IllegalArgumentException("Failed to use null or empty user name.");
final String ctxUser = userStackThreadLocal.get();
@@ -120,7 +111,7 @@ public abstract class IgfsUserContext {
* If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will return null.
* Otherwise it will return the user name set in the most lower {@link #doAs(String, IgniteOutClosure)} call
* on the call stack.
- * @return the current user, may be null.
+ * @return The current user, may be null.
*/
@Nullable public static String currentUser() {
return userStackThreadLocal.get();
[3/4] incubator-ignite git commit: Merge branch 'ignite-sprint-5' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into
ignite-218-hdfs-only
Posted by sb...@apache.org.
Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-218-hdfs-only
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c03cc586
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c03cc586
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c03cc586
Branch: refs/heads/ignite-218-hdfs-only
Commit: c03cc586ee18f972610b8c9406234e0f05867ba3
Parents: 73cb2bc 98e392c
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu May 28 18:45:07 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu May 28 18:45:07 2015 +0300
----------------------------------------------------------------------
.../internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
[4/4] incubator-ignite git commit: [IGNITE-218]: corrections after
review.
Posted by sb...@apache.org.
[IGNITE-218]: corrections after review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3fe65826
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3fe65826
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3fe65826
Branch: refs/heads/ignite-218-hdfs-only
Commit: 3fe6582638bdaecd9f8b92c5c437b456d8bec411
Parents: c03cc58
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu May 28 19:29:16 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu May 28 19:29:16 2015 +0300
----------------------------------------------------------------------
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 28 +-----
.../hadoop/fs/HadoopLazyConcurrentMap.java | 97 ++++++++++++++------
.../hadoop/v2/HadoopV2TaskContext.java | 5 +-
3 files changed, 69 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 9ed92ad..51878da 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -466,33 +466,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public void close() throws IgniteCheckedException {
- final HadoopLazyConcurrentMap<String,FileSystem> map = fileSysLazyMap;
-
- if (map == null)
- return; // already cleared.
-
- List<IOException> ioExs = new LinkedList<>();
-
- // TODO: Close is not thread-safe.
- Set<String> keySet = map.keySet();
-
- for (String key: keySet) {
- FileSystem fs = map.get(key);
-
- if (fs != null) {
- try {
- fs.close();
- }
- catch (IOException ioe) {
- ioExs.add(ioe);
- }
- }
- }
-
- map.clear();
-
- if (!ioExs.isEmpty())
- throw new IgniteCheckedException(ioExs.get(0));
+ fileSysLazyMap.close();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index cc36ea0..0fe9871 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -18,31 +18,34 @@
package org.apache.ignite.internal.processors.hadoop.fs;
import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.future.*;
-// TODO: Remove unused
-import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
+import java.io.*;
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
/**
* Maps values by keys.
* Values are created lazily using {@link ValueFactory}.
- * Currently only {@link #clear()} method can remove a value.
*
* Despite of the name, does not depend on any Hadoop classes.
*/
-public class HadoopLazyConcurrentMap<K, V> {
+public class HadoopLazyConcurrentMap<K, V extends Closeable> {
/** The map storing the actual values. */
private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
/** The factory passed in by the client. Will be used for lazy value creation. */
private final ValueFactory<K, V> factory;
+ /** Lock used to close the objects. */
+ private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
+ /** Flag indicating that this map is closed and cleared. */
+ private boolean closed;
+
/**
* Constructor.
* @param factory the factory to create new values lazily.
@@ -59,16 +62,30 @@ public class HadoopLazyConcurrentMap<K, V> {
* @throws IgniteException on error
*/
public V getOrCreate(K k) {
- // TODO: Do "get" first.
- final ValueWrapper wNew = new ValueWrapper(k);
-
- ValueWrapper w = map.putIfAbsent(k, wNew);
+ ValueWrapper w = map.get(k);
if (w == null) {
- // new wrapper 'w' has been put, so init the value:
- wNew.init();
+ final ValueWrapper wNew = new ValueWrapper(k);
+
+ w = map.putIfAbsent(k, wNew);
+
+ if (w == null) {
+ // new wrapper 'w' has been put, so init the value:
+ closeLock.readLock().lock();
- w = wNew;
+ try {
+ if (closed)
+ throw new IllegalStateException("Failed to create value for key [" + k
+ + "]: the map is already closed.");
+
+ wNew.init();
+ }
+ finally {
+ closeLock.readLock().unlock();
+ }
+
+ w = wNew;
+ }
}
try {
@@ -103,20 +120,40 @@ public class HadoopLazyConcurrentMap<K, V> {
}
/**
- * Gets the keySet of this map,
- * the contract is as per {@link ConcurrentMap#keySet()}
- * @return the set of keys, never null.
+ * Clears the map and closes all the values.
*/
- public Set<K> keySet() {
- return map.keySet();
- }
+ public void close() throws IgniteCheckedException {
+ closeLock.writeLock().lock();
- /**
- * Clears the map.
- * Follows the contract of {@link ConcurrentMap#clear()}
- */
- public void clear() {
- map.clear();
+ try {
+ List<IOException> ioExs = new LinkedList<>();
+
+ Set<K> keySet = map.keySet();
+
+ for (K key: keySet) {
+ ValueWrapper w = map.get(key);
+
+ if (w != null) {
+ try {
+ V v = w.getValue();
+
+ v.close();
+ }
+ catch (IOException ioe) {
+ ioExs.add(ioe);
+ }
+ }
+ }
+
+ map.clear();
+
+ if (!ioExs.isEmpty())
+ throw new IgniteCheckedException(ioExs.get(0));
+
+ closed = true;
+ } finally {
+ closeLock.writeLock().unlock();
+ }
}
/**
@@ -154,11 +191,11 @@ public class HadoopLazyConcurrentMap<K, V> {
}
/**
- * Blocks until the value is initialized.
- * @return the value
- * @throws IgniteInterruptedCheckedException if interrupted during wait.
+ * Gets the available value or blocks until the value is initialized.
+ * @return the value, never null.
+ * @throws IgniteCheckedException on error.
*/
- @Nullable V getValue() throws IgniteCheckedException {
+ V getValue() throws IgniteCheckedException {
return fut.get();
}
}
@@ -170,7 +207,7 @@ public class HadoopLazyConcurrentMap<K, V> {
*/
public interface ValueFactory <K, V> {
/**
- * Creates the new value. Must never return null.
+ * Creates the new value. Should never return null.
*
* @param key the key to create value for
* @return the value.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fe65826/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 2270caa..dd18c66 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -239,10 +239,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
try {
- FileSystem fs = FileSystem.get(jobConf());
-
- // TODO: Remove
- //HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
+ FileSystem.get(jobConf());
LocalFileSystem locFs = FileSystem.getLocal(jobConf());
[2/4] incubator-ignite git commit: [IGNITE-765]:
org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.GcWorker#cleanupResources
incorrectly handles FileLockInterruptionException
Posted by sb...@apache.org.
[IGNITE-765]: org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.GcWorker#cleanupResources incorrectly handles FileLockInterruptionException
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/98e392cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/98e392cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/98e392cd
Branch: refs/heads/ignite-218-hdfs-only
Commit: 98e392cd79183945c26e9c220787cdbd4f686c26
Parents: b6fc8a9
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu May 28 18:39:45 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu May 28 18:39:45 2015 +0300
----------------------------------------------------------------------
.../internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98e392cd/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
index 86a0886..5185856 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
@@ -592,7 +592,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint {
if (log.isDebugEnabled())
log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath());
}
- catch (InterruptedIOException ignored) {
+ catch (FileLockInterruptionException ignored) {
Thread.currentThread().interrupt();
}
catch (IOException e) {