You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/06/02 14:59:29 UTC
[19/50] [abbrv] incubator-ignite git commit: [IGNITE-958]: IGNITE-218
(Wrong staging permissions while running MR job under hadoop accelerator):
IGFS part.
[IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8455c7a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8455c7a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8455c7a6
Branch: refs/heads/ignite-745
Commit: 8455c7a6ed6f7449c7ad31b1ef7b129705262e1b
Parents: 3538819
Author: iveselovskiy <iv...@gridgain.com>
Authored: Fri May 29 15:40:26 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Fri May 29 15:40:26 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/igfs/IgfsUserContext.java | 119 +++++++++++
.../hadoop/fs/HadoopLazyConcurrentMap.java | 204 +++++++++++++++++++
2 files changed, 323 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
new file mode 100644
index 0000000..5a65bdb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+
+/**
+ * Provides ability to execute IGFS code in a context of a specific user.
+ */
+public abstract class IgfsUserContext {
+ /** Thread local to hold the current user context. */
+ private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<>();
+
+ /**
+ * Executes given callable in the given user context.
+ * The main contract of this method is that {@link #currentUser()} method invoked
+ * inside closure always returns 'user' this callable executed with.
+ * @param user the user name to invoke closure on behalf of.
+ * @param clo the closure to execute
+ * @param <T> The type of closure result.
+ * @return the result of closure execution.
+ * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+ */
+ public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
+ if (F.isEmpty(user))
+ throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+ final String ctxUser = userStackThreadLocal.get();
+
+ if (F.eq(ctxUser, user))
+ return clo.apply(); // correct context is already there
+
+ userStackThreadLocal.set(user);
+
+ try {
+ return clo.apply();
+ }
+ finally {
+ userStackThreadLocal.set(ctxUser);
+ }
+ }
+
+ /**
+ * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts
+ * callable that throws checked Exception.
+ * The Exception is not ever wrapped anyhow.
+ * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is:
+ * <pre name="code" class="java">
+ * public Foo myOperation() throws MyCheckedException1, MyCheckedException2 {
+ * try {
+ * return IgfsUserContext.doAs(user, new Callable<Foo>() {
+ * @Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
+ * return makeSomeFoo(); // do the job
+ * }
+ * });
+ * }
+ * catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) {
+ * throw e;
+ * }
+ * catch (Exception e) {
+ * throw new AssertionError("Must never go there.");
+ * }
+ * }
+ * </pre>
+ * @param user the user name to invoke closure on behalf of.
+ * @param clbl the Callable to execute
+ * @param <T> The type of callable result.
+ * @return the result of closure execution.
+ * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+ */
+ public static <T> T doAs(String user, final Callable<T> clbl) throws Exception {
+ if (F.isEmpty(user))
+ throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+ final String ctxUser = userStackThreadLocal.get();
+
+ if (F.eq(ctxUser, user))
+ return clbl.call(); // correct context is already there
+
+ userStackThreadLocal.set(user);
+
+ try {
+ return clbl.call();
+ }
+ finally {
+ userStackThreadLocal.set(ctxUser);
+ }
+ }
+
+ /**
+ * Gets the current context user.
+ * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will
+ * return null. Otherwise it will return the user name set in the most lower
+ * {@link #doAs(String, IgniteOutClosure)} call on the call stack.
+ * @return The current user, may be null.
+ */
+ @Nullable public static String currentUser() {
+ return userStackThreadLocal.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
new file mode 100644
index 0000000..71b38c4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Maps values by keys.
+ * Values are created lazily using {@link ValueFactory}.
+ *
+ * Despite of the name, does not depend on any Hadoop classes.
+ */
+public class HadoopLazyConcurrentMap<K, V extends Closeable> {
+ /** The map storing the actual values. */
+ private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
+
+ /** The factory passed in by the client. Will be used for lazy value creation. */
+ private final ValueFactory<K, V> factory;
+
+ /** Lock used to close the objects. */
+ private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
+ /** Flag indicating that this map is closed and cleared. */
+ private boolean closed;
+
+ /**
+ * Constructor.
+ * @param factory the factory to create new values lazily.
+ */
+ public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) {
+ this.factory = factory;
+ }
+
+ /**
+ * Gets cached or creates a new value of V.
+ * Never returns null.
+ * @param k the key to associate the value with.
+ * @return the cached or newly created value, never null.
+ * @throws IgniteException on error
+ */
+ public V getOrCreate(K k) {
+ ValueWrapper w = map.get(k);
+
+ if (w == null) {
+ closeLock.readLock().lock();
+
+ try {
+ if (closed)
+ throw new IllegalStateException("Failed to create value for key [" + k
+ + "]: the map is already closed.");
+
+ final ValueWrapper wNew = new ValueWrapper(k);
+
+ w = map.putIfAbsent(k, wNew);
+
+ if (w == null) {
+ wNew.init();
+
+ w = wNew;
+ }
+ }
+ finally {
+ closeLock.readLock().unlock();
+ }
+ }
+
+ try {
+ V v = w.getValue();
+
+ assert v != null;
+
+ return v;
+ }
+ catch (IgniteCheckedException ie) {
+ throw new IgniteException(ie);
+ }
+ }
+
+ /**
+ * Clears the map and closes all the values.
+ */
+ public void close() throws IgniteCheckedException {
+ closeLock.writeLock().lock();
+
+ try {
+ closed = true;
+
+ Exception err = null;
+
+ Set<K> keySet = map.keySet();
+
+ for (K key : keySet) {
+ V v = null;
+
+ try {
+ v = map.get(key).getValue();
+ }
+ catch (IgniteCheckedException ignore) {
+ // No-op.
+ }
+
+ if (v != null) {
+ try {
+ v.close();
+ }
+ catch (Exception err0) {
+ if (err == null)
+ err = err0;
+ }
+ }
+ }
+
+ map.clear();
+
+ if (err != null)
+ throw new IgniteCheckedException(err);
+ }
+ finally {
+ closeLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Helper class that drives the lazy value creation.
+ */
+ private class ValueWrapper {
+ /** Future. */
+ private final GridFutureAdapter<V> fut = new GridFutureAdapter<>();
+
+ /** the key */
+ private final K key;
+
+ /**
+ * Creates new wrapper.
+ */
+ private ValueWrapper(K key) {
+ this.key = key;
+ }
+
+ /**
+ * Initializes the value using the factory.
+ */
+ private void init() {
+ try {
+ final V v0 = factory.createValue(key);
+
+ if (v0 == null)
+ throw new IgniteException("Failed to create non-null value. [key=" + key + ']');
+
+ fut.onDone(v0);
+ }
+ catch (Throwable e) {
+ fut.onDone(e);
+ }
+ }
+
+ /**
+ * Gets the available value or blocks until the value is initialized.
+ * @return the value, never null.
+ * @throws IgniteCheckedException on error.
+ */
+ V getValue() throws IgniteCheckedException {
+ return fut.get();
+ }
+ }
+
+ /**
+ * Interface representing the factory that creates map values.
+ * @param <K> the type of the key.
+ * @param <V> the type of the value.
+ */
+ public interface ValueFactory <K, V> {
+ /**
+ * Creates the new value. Should never return null.
+ *
+ * @param key the key to create value for
+ * @return the value.
+ * @throws IgniteException on failure.
+ */
+ public V createValue(K key);
+ }
+}
\ No newline at end of file