You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/29 20:12:20 UTC

[43/50] [abbrv] incubator-ignite git commit: [IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.

[IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8455c7a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8455c7a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8455c7a6

Branch: refs/heads/ignite-218
Commit: 8455c7a6ed6f7449c7ad31b1ef7b129705262e1b
Parents: 3538819
Author: iveselovskiy <iv...@gridgain.com>
Authored: Fri May 29 15:40:26 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Fri May 29 15:40:26 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/igfs/IgfsUserContext.java | 119 +++++++++++
 .../hadoop/fs/HadoopLazyConcurrentMap.java      | 204 +++++++++++++++++++
 2 files changed, 323 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
new file mode 100644
index 0000000..5a65bdb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+
+/**
+ * Provides ability to execute IGFS code in a context of a specific user.
+ */
+public abstract class IgfsUserContext {
+    /** Thread local to hold the current user context. */
+    private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<>();
+
+    /**
+     * Executes given callable in the given user context.
+     * The main contract of this method is that {@link #currentUser()} method invoked
+     * inside closure always returns 'user' this callable executed with.
+     * @param user the user name to invoke closure on behalf of.
+     * @param clo the closure to execute
+     * @param <T> The type of closure result.
+     * @return the result of closure execution.
+     * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+     */
+    public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
+        if (F.isEmpty(user))
+            throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+        final String ctxUser = userStackThreadLocal.get();
+
+        if (F.eq(ctxUser, user))
+            return clo.apply(); // correct context is already there
+
+        userStackThreadLocal.set(user);
+
+        try {
+            return clo.apply();
+        }
+        finally {
+            userStackThreadLocal.set(ctxUser);
+        }
+    }
+
+    /**
+     * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts
+     * callable that throws checked Exception.
+     * The Exception is not ever wrapped anyhow.
+     * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is:
+     * <pre name="code" class="java">
+     *  public Foo myOperation() throws MyCheckedException1, MyCheckedException2 {
+     *      try {
+     *          return IgfsUserContext.doAs(user, new Callable<Foo>() {
+     *              &#64;Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
+     *                  return makeSomeFoo(); // do the job
+     *              }
+     *          });
+     *      }
+     *      catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) {
+     *          throw e;
+     *      }
+     *      catch (Exception e) {
+     *          throw new AssertionError("Must never go there.");
+     *      }
+     *  }
+     * </pre>
+     * @param user the user name to invoke closure on behalf of.
+     * @param clbl the Callable to execute
+     * @param <T> The type of callable result.
+     * @return the result of closure execution.
+     * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+     */
+    public static <T> T doAs(String user, final Callable<T> clbl) throws Exception {
+        if (F.isEmpty(user))
+            throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+        final String ctxUser = userStackThreadLocal.get();
+
+        if (F.eq(ctxUser, user))
+            return clbl.call(); // correct context is already there
+
+        userStackThreadLocal.set(user);
+
+        try {
+            return clbl.call();
+        }
+        finally {
+            userStackThreadLocal.set(ctxUser);
+        }
+    }
+
+    /**
+     * Gets the current context user.
+     * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will
+     * return null. Otherwise it will return the user name set in the most lower
+     * {@link #doAs(String, IgniteOutClosure)} call on the call stack.
+     * @return The current user, may be null.
+     */
+    @Nullable public static String currentUser() {
+        return userStackThreadLocal.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
new file mode 100644
index 0000000..71b38c4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Maps values by keys.
+ * Values are created lazily using {@link ValueFactory}.
+ *
+ * Despite of the name, does not depend on any Hadoop classes.
+ */
+public class HadoopLazyConcurrentMap<K, V extends Closeable> {
+    /** The map storing the actual values. */
+    private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
+
+    /** The factory passed in by the client. Will be used for lazy value creation. */
+    private final ValueFactory<K, V> factory;
+
+    /** Lock used to close the objects. */
+    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
+    /** Flag indicating that this map is closed and cleared. */
+    private boolean closed;
+
+    /**
+     * Constructor.
+     * @param factory the factory to create new values lazily.
+     */
+    public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) {
+        this.factory = factory;
+    }
+
+    /**
+     * Gets cached or creates a new value of V.
+     * Never returns null.
+     * @param k the key to associate the value with.
+     * @return the cached or newly created value, never null.
+     * @throws IgniteException on error
+     */
+    public V getOrCreate(K k) {
+        ValueWrapper w = map.get(k);
+
+        if (w == null) {
+            closeLock.readLock().lock();
+
+            try {
+                if (closed)
+                    throw new IllegalStateException("Failed to create value for key [" + k
+                        + "]: the map is already closed.");
+
+                final ValueWrapper wNew = new ValueWrapper(k);
+
+                w = map.putIfAbsent(k, wNew);
+
+                if (w == null) {
+                    wNew.init();
+
+                    w = wNew;
+                }
+            }
+            finally {
+                closeLock.readLock().unlock();
+            }
+        }
+
+        try {
+            V v = w.getValue();
+
+            assert v != null;
+
+            return v;
+        }
+        catch (IgniteCheckedException ie) {
+            throw new IgniteException(ie);
+        }
+    }
+
+    /**
+     * Clears the map and closes all the values.
+     */
+    public void close() throws IgniteCheckedException {
+        closeLock.writeLock().lock();
+
+        try {
+            closed = true;
+
+            Exception err = null;
+
+            Set<K> keySet = map.keySet();
+
+            for (K key : keySet) {
+                V v = null;
+
+                try {
+                    v = map.get(key).getValue();
+                }
+                catch (IgniteCheckedException ignore) {
+                    // No-op.
+                }
+
+                if (v != null) {
+                    try {
+                        v.close();
+                    }
+                    catch (Exception err0) {
+                        if (err == null)
+                            err = err0;
+                    }
+                }
+            }
+
+            map.clear();
+
+            if (err != null)
+                throw new IgniteCheckedException(err);
+        }
+        finally {
+            closeLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Helper class that drives the lazy value creation.
+     */
+    private class ValueWrapper {
+        /** Future. */
+        private final GridFutureAdapter<V> fut = new GridFutureAdapter<>();
+
+        /** the key */
+        private final K key;
+
+        /**
+         * Creates new wrapper.
+         */
+        private ValueWrapper(K key) {
+            this.key = key;
+        }
+
+        /**
+         * Initializes the value using the factory.
+         */
+        private void init() {
+            try {
+                final V v0 = factory.createValue(key);
+
+                if (v0 == null)
+                    throw new IgniteException("Failed to create non-null value. [key=" + key + ']');
+
+                fut.onDone(v0);
+            }
+            catch (Throwable e) {
+                fut.onDone(e);
+            }
+        }
+
+        /**
+         * Gets the available value or blocks until the value is initialized.
+         * @return the value, never null.
+         * @throws IgniteCheckedException on error.
+         */
+        V getValue() throws IgniteCheckedException {
+            return fut.get();
+        }
+    }
+
+    /**
+     * Interface representing the factory that creates map values.
+     * @param <K> the type of the key.
+     * @param <V> the type of the value.
+     */
+    public interface ValueFactory <K, V> {
+        /**
+         * Creates the new value. Should never return null.
+         *
+         * @param key the key to create value for
+         * @return the value.
+         * @throws IgniteException on failure.
+         */
+        public V createValue(K key);
+    }
+}
\ No newline at end of file