You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 15:26:32 UTC
[42/68] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented
new class loading architecture for embedded execution mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
deleted file mode 100644
index 2e0e271..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ /dev/null
@@ -1,964 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopDaemon;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopShutdownHookManager;
-import org.apache.ignite.internal.util.ClassCache;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
-import org.objectweb.asm.AnnotationVisitor;
-import org.objectweb.asm.Attribute;
-import org.objectweb.asm.ClassReader;
-import org.objectweb.asm.ClassVisitor;
-import org.objectweb.asm.ClassWriter;
-import org.objectweb.asm.FieldVisitor;
-import org.objectweb.asm.Handle;
-import org.objectweb.asm.Label;
-import org.objectweb.asm.MethodVisitor;
-import org.objectweb.asm.Opcodes;
-import org.objectweb.asm.Type;
-import org.objectweb.asm.commons.Remapper;
-import org.objectweb.asm.commons.RemappingClassAdapter;
-
-/**
- * Class loader allowing explicitly load classes without delegation to parent class loader.
- * Also supports class parsing for finding dependencies which contain transitive dependencies
- * unavailable for parent.
- */
-public class HadoopClassLoader extends URLClassLoader implements ClassCache {
- static {
- // We are very parallel capable.
- registerAsParallelCapable();
- }
-
- /** Name of the Hadoop daemon class. */
- public static final String HADOOP_DAEMON_CLASS_NAME = "org.apache.hadoop.util.Daemon";
-
- /** Name of libhadoop library. */
- private static final String LIBHADOOP = "hadoop.";
-
- /** */
- private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader();
-
- /** */
- private static final Collection<URL> appJars = F.asList(APP_CLS_LDR.getURLs());
-
- /** */
- private static volatile Collection<URL> hadoopJars;
-
- /** */
- private static final Map<String, Boolean> cache = new ConcurrentHashMap8<>();
-
- /** */
- private static final Map<String, byte[]> bytesCache = new ConcurrentHashMap8<>();
-
- /** Class cache. */
- private final ConcurrentMap<String, Class> cacheMap = new ConcurrentHashMap<>();
-
- /** Diagnostic name of this class loader. */
- @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
- private final String name;
-
- /** Native library names. */
- private final String[] libNames;
-
- /**
- * Gets name for Job class loader. The name is specific for local node id.
- * @param locNodeId The local node id.
- * @return The class loader name.
- */
- public static String nameForJob(UUID locNodeId) {
- return "hadoop-job-node-" + locNodeId.toString();
- }
-
- /**
- * Gets name for the task class loader. Task class loader
- * @param info The task info.
- * @param prefix Get only prefix (without task type and number)
- * @return The class loader name.
- */
- public static String nameForTask(HadoopTaskInfo info, boolean prefix) {
- if (prefix)
- return "hadoop-task-" + info.jobId() + "-";
- else
- return "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber();
- }
-
- /**
- * Constructor.
- *
- * @param urls Urls.
- * @param name Classloader name.
- * @param libNames Optional additional native library names to be linked from parent classloader.
- */
- public HadoopClassLoader(URL[] urls, String name, @Nullable String[] libNames) {
- super(addHadoopUrls(urls), APP_CLS_LDR);
-
- assert !(getParent() instanceof HadoopClassLoader);
-
- this.name = name;
- this.libNames = libNames;
-
- initializeNativeLibraries();
- }
-
- /**
- * Workaround to load native Hadoop libraries. Java doesn't allow native libraries to be loaded from different
- * classloaders. But we load Hadoop classes many times and one of these classes - {@code NativeCodeLoader} - tries
- * to load the same native library over and over again.
- * <p>
- * To fix the problem, we force native library load in parent class loader and then "link" handle to this native
- * library to our class loader. As a result, our class loader will think that the library is already loaded and will
- * be able to link native methods.
- *
- * @see <a href="http://docs.oracle.com/javase/1.5.0/docs/guide/jni/spec/invocation.html#library_version">
- * JNI specification</a>
- */
- private void initializeNativeLibraries() {
- try {
- // This must trigger native library load.
- Class.forName(NativeCodeLoader.class.getName(), true, APP_CLS_LDR);
-
- final Vector<Object> curVector = U.field(this, "nativeLibraries");
-
- ClassLoader ldr = APP_CLS_LDR;
-
- while (ldr != null) {
- Vector vector = U.field(ldr, "nativeLibraries");
-
- for (Object lib : vector) {
- String name = U.field(lib, "name");
-
- boolean add = name.contains(LIBHADOOP);
-
- if (!add && libNames != null) {
- for (String libName : libNames) {
- if (libName != null && name.contains(libName)) {
- add = true;
-
- break;
- }
- }
- }
-
- if (add) {
- curVector.add(lib);
-
- return;
- }
- }
-
- ldr = ldr.getParent();
- }
- }
- catch (Exception e) {
- U.quietAndWarn(null, "Failed to initialize Hadoop native library " +
- "(native Hadoop methods might not work properly): " + e);
- }
- }
-
- /**
- * Need to parse only Ignite Hadoop and IGFS classes.
- *
- * @param cls Class name.
- * @return {@code true} if we need to check this class.
- */
- private static boolean isHadoopIgfs(String cls) {
- String ignitePkgPrefix = "org.apache.ignite";
-
- int len = ignitePkgPrefix.length();
-
- return cls.startsWith(ignitePkgPrefix) && (
- cls.indexOf("igfs.", len) != -1 ||
- cls.indexOf(".fs.", len) != -1 ||
- cls.indexOf("hadoop.", len) != -1);
- }
-
- /**
- * @param cls Class name.
- * @return {@code true} If this is Hadoop class.
- */
- private static boolean isHadoop(String cls) {
- return cls.startsWith("org.apache.hadoop.");
- }
-
- /** {@inheritDoc} */
- @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
- try {
- if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath.
- if (name.endsWith(".util.ShutdownHookManager")) // Dirty hack to get rid of Hadoop shutdown hooks.
- return loadFromBytes(name, HadoopShutdownHookManager.class.getName());
- else if (name.equals(HADOOP_DAEMON_CLASS_NAME))
- // We replace this in order to be able to forcibly stop some daemon threads
- // that otherwise never stop (e.g. PeerCache runnables):
- return loadFromBytes(name, HadoopDaemon.class.getName());
-
- return loadClassExplicitly(name, resolve);
- }
-
- if (isHadoopIgfs(name)) { // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop.
- Boolean hasDeps = cache.get(name);
-
- if (hasDeps == null) {
- hasDeps = hasExternalDependencies(name);
-
- cache.put(name, hasDeps);
- }
-
- if (hasDeps)
- return loadClassExplicitly(name, resolve);
- }
-
- return super.loadClass(name, resolve);
- }
- catch (NoClassDefFoundError | ClassNotFoundException e) {
- throw new ClassNotFoundException("Failed to load class: " + name, e);
- }
- }
-
- /**
- * @param name Name.
- * @param replace Replacement.
- * @return Class.
- */
- private Class<?> loadFromBytes(final String name, final String replace) {
- synchronized (getClassLoadingLock(name)) {
- // First, check if the class has already been loaded
- Class c = findLoadedClass(name);
-
- if (c != null)
- return c;
-
- byte[] bytes = bytesCache.get(name);
-
- if (bytes == null) {
- InputStream in = loadClassBytes(getParent(), replace);
-
- ClassReader rdr;
-
- try {
- rdr = new ClassReader(in);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- ClassWriter w = new ClassWriter(Opcodes.ASM4);
-
- rdr.accept(new RemappingClassAdapter(w, new Remapper() {
- /** */
- String replaceType = replace.replace('.', '/');
-
- /** */
- String nameType = name.replace('.', '/');
-
- @Override public String map(String type) {
- if (type.equals(replaceType))
- return nameType;
-
- return type;
- }
- }), ClassReader.EXPAND_FRAMES);
-
- bytes = w.toByteArray();
-
- bytesCache.put(name, bytes);
- }
-
- return defineClass(name, bytes, 0, bytes.length);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Class<?> getFromCache(String clsName) throws ClassNotFoundException {
- Class<?> cls = cacheMap.get(clsName);
-
- if (cls == null) {
- Class old = cacheMap.putIfAbsent(clsName, cls = Class.forName(clsName, true, this));
-
- if (old != null)
- cls = old;
- }
-
- return cls;
- }
-
- /**
- * @param name Class name.
- * @param resolve Resolve class.
- * @return Class.
- * @throws ClassNotFoundException If failed.
- */
- private Class<?> loadClassExplicitly(String name, boolean resolve) throws ClassNotFoundException {
- synchronized (getClassLoadingLock(name)) {
- // First, check if the class has already been loaded
- Class c = findLoadedClass(name);
-
- if (c == null) {
- long t1 = System.nanoTime();
-
- c = findClass(name);
-
- // this is the defining class loader; record the stats
- sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1);
- sun.misc.PerfCounter.getFindClasses().increment();
- }
-
- if (resolve)
- resolveClass(c);
-
- return c;
- }
- }
-
- /**
- * @param ldr Loader.
- * @param clsName Class.
- * @return Input stream.
- */
- @Nullable private InputStream loadClassBytes(ClassLoader ldr, String clsName) {
- return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class");
- }
-
- /**
- * Check whether class has external dependencies on Hadoop.
- *
- * @param clsName Class name.
- * @return {@code True} if class has external dependencies.
- */
- boolean hasExternalDependencies(String clsName) {
- CollectingContext ctx = new CollectingContext();
-
- ctx.annVisitor = new CollectingAnnotationVisitor(ctx);
- ctx.mthdVisitor = new CollectingMethodVisitor(ctx, ctx.annVisitor);
- ctx.fldVisitor = new CollectingFieldVisitor(ctx, ctx.annVisitor);
- ctx.clsVisitor = new CollectingClassVisitor(ctx, ctx.annVisitor, ctx.mthdVisitor, ctx.fldVisitor);
-
- return hasExternalDependencies(clsName, ctx);
- }
-
- /**
- * Check whether class has external dependencies on Hadoop.
- *
- * @param clsName Class name.
- * @param ctx Context.
- * @return {@code true} If the class has external dependencies.
- */
- boolean hasExternalDependencies(String clsName, CollectingContext ctx) {
- if (isHadoop(clsName)) // Hadoop must not be in classpath but Idea sucks, so filtering explicitly as external.
- return true;
-
- // Try to get from parent to check if the type accessible.
- InputStream in = loadClassBytes(getParent(), clsName);
-
- if (in == null) // The class is external itself, it must be loaded from this class loader.
- return true;
-
- if (!isHadoopIgfs(clsName)) // Other classes should not have external dependencies.
- return false;
-
- final ClassReader rdr;
-
- try {
- rdr = new ClassReader(in);
- }
- catch (IOException e) {
- throw new RuntimeException("Failed to read class: " + clsName, e);
- }
-
- ctx.visited.add(clsName);
-
- rdr.accept(ctx.clsVisitor, 0);
-
- if (ctx.found) // We already know that we have dependencies, no need to check parent.
- return true;
-
- // Here we are known to not have any dependencies but possibly we have a parent which has them.
- int idx = clsName.lastIndexOf('$');
-
- if (idx == -1) // No parent class.
- return false;
-
- String parentCls = clsName.substring(0, idx);
-
- if (ctx.visited.contains(parentCls))
- return false;
-
- Boolean res = cache.get(parentCls);
-
- if (res == null)
- res = hasExternalDependencies(parentCls, ctx);
-
- return res;
- }
-
- /**
- * @param name Class name.
- * @return {@code true} If this is a valid class name.
- */
- private static boolean validateClassName(String name) {
- int len = name.length();
-
- if (len <= 1)
- return false;
-
- if (!Character.isJavaIdentifierStart(name.charAt(0)))
- return false;
-
- boolean hasDot = false;
-
- for (int i = 1; i < len; i++) {
- char c = name.charAt(i);
-
- if (c == '.')
- hasDot = true;
- else if (!Character.isJavaIdentifierPart(c))
- return false;
- }
-
- return hasDot;
- }
-
- /**
- * @param urls URLs.
- * @return URLs.
- */
- private static URL[] addHadoopUrls(URL[] urls) {
- Collection<URL> hadoopJars;
-
- try {
- hadoopJars = hadoopUrls();
- }
- catch (IgniteCheckedException e) {
- throw new RuntimeException(e);
- }
-
- ArrayList<URL> list = new ArrayList<>(hadoopJars.size() + appJars.size() + (urls == null ? 0 : urls.length));
-
- list.addAll(appJars);
- list.addAll(hadoopJars);
-
- if (!F.isEmpty(urls))
- list.addAll(F.asList(urls));
-
- return list.toArray(new URL[list.size()]);
- }
-
- /**
- * @return Collection of jar URLs.
- * @throws IgniteCheckedException If failed.
- */
- public static Collection<URL> hadoopUrls() throws IgniteCheckedException {
- Collection<URL> hadoopUrls = hadoopJars;
-
- if (hadoopUrls != null)
- return hadoopUrls;
-
- synchronized (HadoopClassLoader.class) {
- hadoopUrls = hadoopJars;
-
- if (hadoopUrls != null)
- return hadoopUrls;
-
- try {
- hadoopUrls = HadoopClasspathUtils.classpathForClassLoader();
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to resolve Hadoop JAR locations: " + e.getMessage(), e);
- }
-
- hadoopJars = hadoopUrls;
-
- return hadoopUrls;
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopClassLoader.class, this);
- }
-
- /**
- * Getter for name field.
- */
- public String name() {
- return name;
- }
-
- /**
- * Context for dependencies collection.
- */
- private class CollectingContext {
- /** Visited classes. */
- private final Set<String> visited = new HashSet<>();
-
- /** Whether dependency found. */
- private boolean found;
-
- /** Annotation visitor. */
- private AnnotationVisitor annVisitor;
-
- /** Method visitor. */
- private MethodVisitor mthdVisitor;
-
- /** Field visitor. */
- private FieldVisitor fldVisitor;
-
- /** Class visitor. */
- private ClassVisitor clsVisitor;
-
- /**
- * Processes a method descriptor
- * @param methDesc The method desc String.
- */
- void onMethodsDesc(final String methDesc) {
- // Process method return type:
- onType(Type.getReturnType(methDesc));
-
- if (found)
- return;
-
- // Process method argument types:
- for (Type t: Type.getArgumentTypes(methDesc)) {
- onType(t);
-
- if (found)
- return;
- }
- }
-
- /**
- * Processes dependencies of a class.
- *
- * @param depCls The class name as dot-notated FQN.
- */
- void onClass(final String depCls) {
- assert depCls.indexOf('/') == -1 : depCls; // class name should be fully converted to dot notation.
- assert depCls.charAt(0) != 'L' : depCls;
- assert validateClassName(depCls) : depCls;
-
- if (depCls.startsWith("java.") || depCls.startsWith("javax.")) // Filter out platform classes.
- return;
-
- if (visited.contains(depCls))
- return;
-
- Boolean res = cache.get(depCls);
-
- if (res == Boolean.TRUE || (res == null && hasExternalDependencies(depCls, this)))
- found = true;
- }
-
- /**
- * Analyses dependencies of given type.
- *
- * @param t The type to process.
- */
- void onType(Type t) {
- if (t == null)
- return;
-
- int sort = t.getSort();
-
- switch (sort) {
- case Type.ARRAY:
- onType(t.getElementType());
-
- break;
-
- case Type.OBJECT:
- onClass(t.getClassName());
-
- break;
- }
- }
-
- /**
- * Analyses dependencies of given object type.
- *
- * @param objType The object type to process.
- */
- void onInternalTypeName(String objType) {
- if (objType == null)
- return;
-
- assert objType.length() > 1 : objType;
-
- if (objType.charAt(0) == '[')
- // handle array. In this case this is a type descriptor notation, like "[Ljava/lang/Object;"
- onType(objType);
- else {
- assert objType.indexOf('.') == -1 : objType; // Must be slash-separated FQN.
-
- String clsName = objType.replace('/', '.'); // Convert it to dot notation.
-
- onClass(clsName); // Process.
- }
- }
-
- /**
- * Type description analyser.
- *
- * @param desc The description.
- */
- void onType(String desc) {
- if (!F.isEmpty(desc)) {
- if (desc.length() <= 1)
- return; // Optimization: filter out primitive types in early stage.
-
- Type t = Type.getType(desc);
-
- onType(t);
- }
- }
- }
-
- /**
- * Annotation visitor.
- */
- private static class CollectingAnnotationVisitor extends AnnotationVisitor {
- /** */
- final CollectingContext ctx;
-
- /**
- * Annotation visitor.
- *
- * @param ctx The collector.
- */
- CollectingAnnotationVisitor(CollectingContext ctx) {
- super(Opcodes.ASM4);
-
- this.ctx = ctx;
- }
-
- /** {@inheritDoc} */
- @Override public AnnotationVisitor visitAnnotation(String name, String desc) {
- if (ctx.found)
- return null;
-
- ctx.onType(desc);
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public void visitEnum(String name, String desc, String val) {
- if (ctx.found)
- return;
-
- ctx.onType(desc);
- }
-
- /** {@inheritDoc} */
- @Override public AnnotationVisitor visitArray(String name) {
- return ctx.found ? null : this;
- }
-
- /** {@inheritDoc} */
- @Override public void visit(String name, Object val) {
- if (ctx.found)
- return;
-
- if (val instanceof Type)
- ctx.onType((Type)val);
- }
-
- /** {@inheritDoc} */
- @Override public void visitEnd() {
- // No-op.
- }
- }
-
- /**
- * Field visitor.
- */
- private static class CollectingFieldVisitor extends FieldVisitor {
- /** Collector. */
- private final CollectingContext ctx;
-
- /** Annotation visitor. */
- private final AnnotationVisitor av;
-
- /**
- * Constructor.
- */
- CollectingFieldVisitor(CollectingContext ctx, AnnotationVisitor av) {
- super(Opcodes.ASM4);
-
- this.ctx = ctx;
- this.av = av;
- }
-
- /** {@inheritDoc} */
- @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
- if (ctx.found)
- return null;
-
- ctx.onType(desc);
-
- return ctx.found ? null : av;
- }
-
- /** {@inheritDoc} */
- @Override public void visitAttribute(Attribute attr) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void visitEnd() {
- // No-op.
- }
- }
-
- /**
- * Class visitor.
- */
- private static class CollectingClassVisitor extends ClassVisitor {
- /** Collector. */
- private final CollectingContext ctx;
-
- /** Annotation visitor. */
- private final AnnotationVisitor av;
-
- /** Method visitor. */
- private final MethodVisitor mv;
-
- /** Field visitor. */
- private final FieldVisitor fv;
-
- /**
- * Constructor.
- *
- * @param ctx Collector.
- * @param av Annotation visitor.
- * @param mv Method visitor.
- * @param fv Field visitor.
- */
- CollectingClassVisitor(CollectingContext ctx, AnnotationVisitor av, MethodVisitor mv, FieldVisitor fv) {
- super(Opcodes.ASM4);
-
- this.ctx = ctx;
- this.av = av;
- this.mv = mv;
- this.fv = fv;
- }
-
- /** {@inheritDoc} */
- @Override public void visit(int i, int i2, String name, String signature, String superName, String[] ifaces) {
- if (ctx.found)
- return;
-
- ctx.onInternalTypeName(superName);
-
- if (ctx.found)
- return;
-
- if (ifaces != null) {
- for (String iface : ifaces) {
- ctx.onInternalTypeName(iface);
-
- if (ctx.found)
- return;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
- if (ctx.found)
- return null;
-
- ctx.onType(desc);
-
- return ctx.found ? null : av;
- }
-
- /** {@inheritDoc} */
- @Override public void visitInnerClass(String name, String outerName, String innerName, int i) {
- if (ctx.found)
- return;
-
- ctx.onInternalTypeName(name);
- }
-
- /** {@inheritDoc} */
- @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) {
- if (ctx.found)
- return null;
-
- ctx.onType(desc);
-
- return ctx.found ? null : fv;
- }
-
- /** {@inheritDoc} */
- @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature,
- String[] exceptions) {
- if (ctx.found)
- return null;
-
- ctx.onMethodsDesc(desc);
-
- // Process declared method exceptions:
- if (exceptions != null) {
- for (String e : exceptions)
- ctx.onInternalTypeName(e);
- }
-
- return ctx.found ? null : mv;
- }
- }
-
- /**
- * Method visitor.
- */
- private static class CollectingMethodVisitor extends MethodVisitor {
- /** Collector. */
- private final CollectingContext ctx;
-
- /** Annotation visitor. */
- private final AnnotationVisitor av;
-
- /**
- * Constructor.
- *
- * @param ctx Collector.
- * @param av Annotation visitor.
- */
- private CollectingMethodVisitor(CollectingContext ctx, AnnotationVisitor av) {
- super(Opcodes.ASM4);
-
- this.ctx = ctx;
- this.av = av;
- }
-
- /** {@inheritDoc} */
- @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
- if (ctx.found)
- return null;
-
- ctx.onType(desc);
-
- return ctx.found ? null : av;
- }
-
- /** {@inheritDoc} */
- @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) {
- if (ctx.found)
- return null;
-
- ctx.onType(desc);
-
- return ctx.found ? null : av;
- }
-
- /** {@inheritDoc} */
- @Override public AnnotationVisitor visitAnnotationDefault() {
- return ctx.found ? null : av;
- }
-
- /** {@inheritDoc} */
- @Override public void visitFieldInsn(int opcode, String owner, String name, String desc) {
- if (ctx.found)
- return;
-
- ctx.onInternalTypeName(owner);
-
- if (ctx.found)
- return;
-
- ctx.onType(desc);
- }
-
- /** {@inheritDoc} */
- @Override public void visitInvokeDynamicInsn(String name, String desc, Handle bsm, Object... bsmArgs) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void visitFrame(int type, int nLoc, Object[] locTypes, int nStack, Object[] stackTypes) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void visitLocalVariable(String name, String desc, String signature, Label lb,
- Label lb2, int i) {
- if (ctx.found)
- return;
-
- ctx.onType(desc);
- }
-
- /** {@inheritDoc} */
- @Override public void visitMethodInsn(int i, String owner, String name, String desc) {
- if (ctx.found)
- return;
-
- ctx.onInternalTypeName(owner);
-
- if (ctx.found)
- return;
-
- ctx.onMethodsDesc(desc);
- }
-
- /** {@inheritDoc} */
- @Override public void visitMultiANewArrayInsn(String desc, int dim) {
- if (ctx.found)
- return;
-
- ctx.onType(desc);
- }
-
- /** {@inheritDoc} */
- @Override public void visitTryCatchBlock(Label start, Label end, Label hndl, String typeStr) {
- if (ctx.found)
- return;
-
- ctx.onInternalTypeName(typeStr);
- }
-
- /** {@inheritDoc} */
- @Override public void visitTypeInsn(int opcode, String type) {
- if (ctx.found)
- return;
-
- ctx.onInternalTypeName(type);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathMain.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathMain.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathMain.java
deleted file mode 100644
index 4069496..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathMain.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-/**
- * Main class to compose Hadoop classpath depending on the environment.
- * This class is designed to be independent on any Ignite classes as possible.
- * Please make sure to pass the path separator character as the 1st parameter to the main method.
- */
-public class HadoopClasspathMain {
- /**
- * Main method to be executed from scripts. It prints the classpath to the standard output.
- *
- * @param args The 1st argument should be the path separator character (":" on Linux, ";" on Windows).
- */
- public static void main(String[] args) throws Exception {
- if (args.length < 1)
- throw new IllegalArgumentException("Path separator must be passed as the first argument.");
-
- String separator = args[0];
-
- StringBuilder sb = new StringBuilder();
-
- for (String path : HadoopClasspathUtils.classpathForProcess())
- sb.append(path).append(separator);
-
- System.out.println(sb);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathUtils.java
deleted file mode 100644
index f5c2814..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathUtils.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Hadoop classpath utilities.
- */
-public class HadoopClasspathUtils {
- /** Prefix directory. */
- public static final String PREFIX = "HADOOP_PREFIX";
-
- /** Hadoop home directory. */
- public static final String HOME = "HADOOP_HOME";
-
- /** Hadoop common directory. */
- public static final String COMMON_HOME = "HADOOP_COMMON_HOME";
-
- /** Hadoop HDFS directory. */
- public static final String HDFS_HOME = "HADOOP_HDFS_HOME";
-
- /** Hadoop mapred directory. */
- public static final String MAPRED_HOME = "HADOOP_MAPRED_HOME";
-
- /** Arbitrary additional dependencies. Compliant with standard Java classpath resolution. */
- public static final String HADOOP_USER_LIBS = "HADOOP_USER_LIBS";
-
- /** Empty string. */
- private static final String EMPTY_STR = "";
-
- /**
- * Gets Hadoop class path as list of classpath elements for process.
- *
- * @return List of the class path elements.
- * @throws IOException If failed.
- */
- public static List<String> classpathForProcess() throws IOException {
- List<String> res = new ArrayList<>();
-
- for (final SearchDirectory dir : classpathDirectories()) {
- File[] files = dir.files();
-
- if (dir.useWildcard()) {
- if (files.length > 0)
- res.add(dir.absolutePath() + File.separator + '*');
- }
- else {
- for (File file : files)
- res.add(file.getAbsolutePath());
- }
- }
-
- return res;
- }
-
- /**
- * Gets Hadoop class path as a list of URLs (for in-process class loader usage).
- *
- * @return List of class path URLs.
- * @throws IOException If failed.
- */
- public static List<URL> classpathForClassLoader() throws IOException {
- List<URL> res = new ArrayList<>();
-
- for (SearchDirectory dir : classpathDirectories()) {
- for (File file : dir.files()) {
- try {
- res.add(file.toURI().toURL());
- }
- catch (MalformedURLException e) {
- throw new IOException("Failed to convert file path to URL: " + file.getPath());
- }
- }
- }
-
- return res;
- }
-
- /**
- * Gets Hadoop locations.
- *
- * @return The locations as determined from the environment.
- */
- public static HadoopLocations locations() throws IOException {
- // Query environment.
- String hadoopHome = systemOrEnv(PREFIX, systemOrEnv(HOME, EMPTY_STR));
-
- String commonHome = systemOrEnv(COMMON_HOME, EMPTY_STR);
- String hdfsHome = systemOrEnv(HDFS_HOME, EMPTY_STR);
- String mapredHome = systemOrEnv(MAPRED_HOME, EMPTY_STR);
-
- // If any composite location is defined, use only them.
- if (!isEmpty(commonHome) || !isEmpty(hdfsHome) || !isEmpty(mapredHome)) {
- HadoopLocations res = new HadoopLocations(hadoopHome, commonHome, hdfsHome, mapredHome);
-
- if (res.valid())
- return res;
- else
- throw new IOException("Failed to resolve Hadoop classpath because some environment variables are " +
- "either undefined or point to nonexistent directories [" +
- "[env=" + COMMON_HOME + ", value=" + commonHome + ", exists=" + res.commonExists() + "], " +
- "[env=" + HDFS_HOME + ", value=" + hdfsHome + ", exists=" + res.hdfsExists() + "], " +
- "[env=" + MAPRED_HOME + ", value=" + mapredHome + ", exists=" + res.mapredExists() + "]]");
- }
- else if (!isEmpty(hadoopHome)) {
- // All further checks will be based on HADOOP_HOME, so check for it's existence.
- if (!exists(hadoopHome))
- throw new IOException("Failed to resolve Hadoop classpath because " + HOME + " environment " +
- "variable points to nonexistent directory: " + hadoopHome);
-
- // Probe Apache Hadoop.
- HadoopLocations res = new HadoopLocations(
- hadoopHome,
- hadoopHome + "/share/hadoop/common",
- hadoopHome + "/share/hadoop/hdfs",
- hadoopHome + "/share/hadoop/mapreduce"
- );
-
- if (res.valid())
- return res;
-
- // Probe CDH.
- res = new HadoopLocations(
- hadoopHome,
- hadoopHome,
- hadoopHome + "/../hadoop-hdfs",
- hadoopHome + "/../hadoop-mapreduce"
- );
-
- if (res.valid())
- return res;
-
- // Probe HDP.
- res = new HadoopLocations(
- hadoopHome,
- hadoopHome,
- hadoopHome + "/../hadoop-hdfs-client",
- hadoopHome + "/../hadoop-mapreduce-client"
- );
-
- if (res.valid())
- return res;
-
- // Failed.
- throw new IOException("Failed to resolve Hadoop classpath because " + HOME + " environment variable " +
- "is either invalid or points to non-standard Hadoop distribution: " + hadoopHome);
- }
- else {
- // Advise to set HADOOP_HOME only as this is preferred way to configure classpath.
- throw new IOException("Failed to resolve Hadoop classpath (please define " + HOME + " environment " +
- "variable and point it to your Hadoop distribution).");
- }
- }
-
- /**
- * Gets base directories to discover classpath elements in.
- *
- * @return Collection of directory and mask pairs.
- * @throws IOException if a mandatory classpath location is not found.
- */
- private static Collection<SearchDirectory> classpathDirectories() throws IOException {
- HadoopLocations loc = locations();
-
- Collection<SearchDirectory> res = new ArrayList<>();
-
- res.add(new SearchDirectory(new File(loc.common(), "lib"), AcceptAllDirectoryFilter.INSTANCE));
- res.add(new SearchDirectory(new File(loc.hdfs(), "lib"), AcceptAllDirectoryFilter.INSTANCE));
- res.add(new SearchDirectory(new File(loc.mapred(), "lib"), AcceptAllDirectoryFilter.INSTANCE));
-
- res.add(new SearchDirectory(new File(loc.common()), new PrefixDirectoryFilter("hadoop-common-")));
- res.add(new SearchDirectory(new File(loc.common()), new PrefixDirectoryFilter("hadoop-auth-")));
-
- res.add(new SearchDirectory(new File(loc.hdfs()), new PrefixDirectoryFilter("hadoop-hdfs-")));
-
- res.add(new SearchDirectory(new File(loc.mapred()),
- new PrefixDirectoryFilter("hadoop-mapreduce-client-common")));
- res.add(new SearchDirectory(new File(loc.mapred()),
- new PrefixDirectoryFilter("hadoop-mapreduce-client-core")));
-
- res.addAll(parseUserLibs());
-
- return res;
- }
-
- /**
- * Parse user libs.
- *
- * @return Parsed libs search patterns.
- * @throws IOException If failed.
- */
- static Collection<SearchDirectory> parseUserLibs() throws IOException {
- return parseUserLibs(systemOrEnv(HADOOP_USER_LIBS, null));
- }
-
- /**
- * Parse user libs.
- *
- * @param str String.
- * @return Result.
- * @throws IOException If failed.
- */
- static Collection<SearchDirectory> parseUserLibs(String str) throws IOException {
- Collection<SearchDirectory> res = new LinkedList<>();
-
- if (!isEmpty(str)) {
- String[] tokens = normalize(str).split(File.pathSeparator);
-
- for (String token : tokens) {
- // Skip empty tokens.
- if (isEmpty(token))
- continue;
-
- File file = new File(token);
- File dir = file.getParentFile();
-
- if (token.endsWith("*")) {
- assert dir != null;
-
- res.add(new SearchDirectory(dir, AcceptAllDirectoryFilter.INSTANCE, false));
- }
- else {
- // Met "/" or "C:\" pattern, nothing to do with it.
- if (dir == null)
- continue;
-
- res.add(new SearchDirectory(dir, new ExactDirectoryFilter(file.getName()), false));
- }
- }
- }
-
- return res;
- }
-
- /**
- * Get system property or environment variable with the given name.
- *
- * @param name Variable name.
- * @param dflt Default value.
- * @return Value.
- */
- private static String systemOrEnv(String name, String dflt) {
- String res = System.getProperty(name);
-
- if (res == null)
- res = System.getenv(name);
-
- return res != null ? res : dflt;
- }
-
- /**
- * Answers if the given path denotes existing directory.
- *
- * @param path The directory path.
- * @return {@code True} if the given path denotes an existing directory.
- */
- public static boolean exists(String path) {
- if (path == null)
- return false;
-
- Path p = Paths.get(path);
-
- return Files.exists(p) && Files.isDirectory(p) && Files.isReadable(p);
- }
-
- /**
- * Check if string is empty.
- *
- * @param val Value.
- * @return {@code True} if empty.
- */
- private static boolean isEmpty(String val) {
- return val == null || val.isEmpty();
- }
-
- /**
- * NOramlize the string.
- *
- * @param str String.
- * @return Normalized string.
- */
- private static String normalize(String str) {
- assert str != null;
-
- return str.trim().toLowerCase();
- }
-
- /**
- * Simple pair-like structure to hold directory name and a mask assigned to it.
- */
- static class SearchDirectory {
- /** File. */
- private final File dir;
-
- /** Filter. */
- private final DirectoryFilter filter;
-
- /** Whether directory must exist. */
- private final boolean strict;
-
- /**
- * Constructor for directory search with strict rule.
- *
- * @param dir Directory.
- * @param filter Filter.
- * @throws IOException If failed.
- */
- private SearchDirectory(File dir, DirectoryFilter filter) throws IOException {
- this(dir, filter, true);
- }
-
- /**
- * Constructor.
- *
- * @param dir Directory.
- * @param filter Filter.
- * @param strict Whether directory must exist.
- * @throws IOException If failed.
- */
- private SearchDirectory(File dir, DirectoryFilter filter, boolean strict) throws IOException {
- this.dir = dir;
- this.filter = filter;
- this.strict = strict;
-
- if (strict && !exists(dir.getAbsolutePath()))
- throw new IOException("Directory cannot be read: " + dir.getAbsolutePath());
- }
-
- /**
- * @return Absolute path.
- */
- String absolutePath() {
- return dir.getAbsolutePath();
- }
-
- /**
- * @return Child files.
- */
- File[] files() throws IOException {
- File[] files = dir.listFiles(new FilenameFilter() {
- @Override public boolean accept(File dir, String name) {
- return filter.test(name);
- }
- });
-
- if (files == null) {
- if (strict)
- throw new IOException("Failed to get directory files [dir=" + dir + ']');
- else
- return new File[0];
- }
- else
- return files;
- }
-
- /**
- * @return {@code True} if wildcard can be used.
- */
- boolean useWildcard() {
- return filter instanceof AcceptAllDirectoryFilter;
- }
- }
-
- /**
- * Directory filter interface.
- */
- static interface DirectoryFilter {
- /**
- * Test if file with this name should be included.
- *
- * @param name File name.
- * @return {@code True} if passed.
- */
- public boolean test(String name);
- }
-
- /**
- * Filter to accept all files.
- */
- static class AcceptAllDirectoryFilter implements DirectoryFilter {
- /** Singleton instance. */
- public static final AcceptAllDirectoryFilter INSTANCE = new AcceptAllDirectoryFilter();
-
- /** {@inheritDoc} */
- @Override public boolean test(String name) {
- return true;
- }
- }
-
- /**
- * Filter which uses prefix to filter files.
- */
- static class PrefixDirectoryFilter implements DirectoryFilter {
- /** Prefix. */
- private final String prefix;
-
- /**
- * Constructor.
- *
- * @param prefix Prefix.
- */
- public PrefixDirectoryFilter(String prefix) {
- assert prefix != null;
-
- this.prefix = normalize(prefix);
- }
-
- /** {@inheritDoc} */
- @Override public boolean test(String name) {
- return normalize(name).startsWith(prefix);
- }
- }
-
- /**
- * Filter which uses exact comparison.
- */
- static class ExactDirectoryFilter implements DirectoryFilter {
- /** Name. */
- private final String name;
-
- /**
- * Constructor.
- *
- * @param name Name.
- */
- public ExactDirectoryFilter(String name) {
- this.name = normalize(name);
- }
-
- /** {@inheritDoc} */
- @Override public boolean test(String name) {
- return normalize(name).equals(this.name);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
new file mode 100644
index 0000000..37af147
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeSet;
+
+/**
+ * Common Hadoop utility methods which do not depend on Hadoop API.
+ */
+public class HadoopCommonUtils {
+ /** Job class name. */
+ public static final String JOB_CLS_NAME = "org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job";
+
+ /** Property to store timestamp of new job id request. */
+ public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs";
+
+ /** Property to store timestamp of response of new job id request. */
+ public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs";
+
+ /** Property to store timestamp of job submission. */
+ public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs";
+
+ /** Property to set custom writer of job statistics. */
+ public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer";
+
+ /**
+ * Sort input splits by length.
+ *
+ * @param splits Splits.
+ * @return Sorted splits.
+ */
+ public static List<HadoopInputSplit> sortInputSplits(Collection<HadoopInputSplit> splits) {
+ int id = 0;
+
+ TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>();
+
+ for (HadoopInputSplit split : splits) {
+ long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0;
+
+ sortedSplits.add(new SplitSortWrapper(id++, split, len));
+ }
+
+ ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size());
+
+ for (SplitSortWrapper sortedSplit : sortedSplits)
+ res.add(sortedSplit.split);
+
+ return res;
+ }
+
+ /**
+ * Set context class loader.
+ *
+ * @param newLdr New class loader.
+ * @return Old class loader.
+ */
+ @Nullable public static ClassLoader setContextClassLoader(@Nullable ClassLoader newLdr) {
+ ClassLoader oldLdr = Thread.currentThread().getContextClassLoader();
+
+ if (newLdr != oldLdr)
+ Thread.currentThread().setContextClassLoader(newLdr);
+
+ return oldLdr;
+ }
+
+ /**
+ * Restore context class loader.
+ *
+ * @param oldLdr Original class loader.
+ */
+ public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) {
+ ClassLoader newLdr = Thread.currentThread().getContextClassLoader();
+
+ if (newLdr != oldLdr)
+ Thread.currentThread().setContextClassLoader(oldLdr);
+ }
+
+ /**
+ * Split wrapper for sorting.
+ */
+ private static class SplitSortWrapper implements Comparable<SplitSortWrapper> {
+ /** Unique ID. */
+ private final int id;
+
+ /** Split. */
+ private final HadoopInputSplit split;
+
+ /** Split length. */
+ private final long len;
+
+ /**
+ * Constructor.
+ *
+ * @param id Unique ID.
+ * @param split Split.
+ * @param len Split length.
+ */
+ public SplitSortWrapper(int id, HadoopInputSplit split, long len) {
+ this.id = id;
+ this.split = split;
+ this.len = len;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("NullableProblems")
+ @Override public int compareTo(SplitSortWrapper other) {
+ long res = len - other.len;
+
+ if (res > 0)
+ return -1;
+ else if (res < 0)
+ return 1;
+ else
+ return id - other.id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id;
+ }
+ }
+
+ /**
+ * Private constructor.
+ */
+ private HadoopCommonUtils() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
index 42a3d72..4326ad2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
@@ -24,7 +24,6 @@ import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.HadoopConfiguration;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
deleted file mode 100644
index 1382c1f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop job info based on default Hadoop configuration.
- */
-public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
- /** */
- private static final long serialVersionUID = 5489900236464999951L;
-
- /** {@code true} If job has combiner. */
- private boolean hasCombiner;
-
- /** Number of reducers configured for job. */
- private int numReduces;
-
- /** Configuration. */
- private Map<String,String> props = new HashMap<>();
-
- /** Job name. */
- private String jobName;
-
- /** User name. */
- private String user;
-
- /**
- * Default constructor required by {@link Externalizable}.
- */
- public HadoopDefaultJobInfo() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param jobName Job name.
- * @param user User name.
- * @param hasCombiner {@code true} If job has combiner.
- * @param numReduces Number of reducers configured for job.
- * @param props All other properties of the job.
- */
- public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces,
- Map<String, String> props) {
- this.jobName = jobName;
- this.user = user;
- this.hasCombiner = hasCombiner;
- this.numReduces = numReduces;
- this.props = props;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public String property(String name) {
- return props.get(name);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
- @Nullable String[] libNames) throws IgniteCheckedException {
- assert jobCls != null;
-
- try {
- Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class,
- HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class);
-
- return constructor.newInstance(jobId, this, log, libNames);
- }
- catch (Throwable t) {
- if (t instanceof Error)
- throw (Error)t;
-
- throw new IgniteCheckedException(t);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasCombiner() {
- return hasCombiner;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasReducer() {
- return reducers() > 0;
- }
-
- /** {@inheritDoc} */
- @Override public int reducers() {
- return numReduces;
- }
-
- /** {@inheritDoc} */
- @Override public String jobName() {
- return jobName;
- }
-
- /** {@inheritDoc} */
- @Override public String user() {
- return user;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, jobName);
- U.writeString(out, user);
-
- out.writeBoolean(hasCombiner);
- out.writeInt(numReduces);
-
- U.writeStringMap(out, props);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobName = U.readString(in);
- user = U.readString(in);
-
- hasCombiner = in.readBoolean();
- numReduces = in.readInt();
-
- props = U.readStringMap(in);
- }
-
- /**
- * @return Properties of the job.
- */
- public Map<String, String> properties() {
- return props;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
new file mode 100644
index 0000000..bd767b3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Split serialized in external file.
+ */
+public class HadoopExternalSplit extends HadoopInputSplit {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long off;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public HadoopExternalSplit() {
+ // No-op.
+ }
+
+ /**
+ * @param hosts Hosts.
+ * @param off Offset of this split in external file.
+ */
+ public HadoopExternalSplit(String[] hosts, long off) {
+ assert off >= 0 : off;
+ assert hosts != null;
+
+ this.hosts = hosts;
+ this.off = off;
+ }
+
+ /**
+ * @return Offset of this input split in external file.
+ */
+ public long offset() {
+ return off;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(off);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ off = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ HadoopExternalSplit that = (HadoopExternalSplit) o;
+
+ return off == that.off;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (int)(off ^ (off >>> 32));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java
new file mode 100644
index 0000000..71bb8a4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.jetbrains.annotations.Nullable;
+import org.objectweb.asm.ClassReader;
+import org.objectweb.asm.ClassWriter;
+import org.objectweb.asm.Opcodes;
+import org.objectweb.asm.commons.Remapper;
+import org.objectweb.asm.commons.RemappingClassAdapter;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Utility methods for Hadoop classloader required to avoid direct 3rd-party dependencies in class loader.
+ */
+public class HadoopHelperImpl implements HadoopHelper {
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
+ /** Common class loader. */
+ private volatile HadoopClassLoader ldr;
+
+ /**
+ * Default constructor.
+ */
+ public HadoopHelperImpl() {
+ this(null);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Kernal context.
+ */
+ public HadoopHelperImpl(GridKernalContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isNoOp() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopClassLoader commonClassLoader() {
+ HadoopClassLoader res = ldr;
+
+ if (res == null) {
+ synchronized (this) {
+ res = ldr;
+
+ if (res == null) {
+ String[] libNames = null;
+
+ if (ctx != null && ctx.config().getHadoopConfiguration() != null)
+ libNames = ctx.config().getHadoopConfiguration().getNativeLibraryNames();
+
+ res = new HadoopClassLoader(null, "hadoop-common", libNames, this);
+
+ ldr = res;
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] loadReplace(InputStream in, final String originalName, final String replaceName) {
+ ClassReader rdr;
+
+ try {
+ rdr = new ClassReader(in);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ ClassWriter w = new ClassWriter(Opcodes.ASM4);
+
+ rdr.accept(new RemappingClassAdapter(w, new Remapper() {
+ /** */
+ String replaceType = replaceName.replace('.', '/');
+
+ /** */
+ String nameType = originalName.replace('.', '/');
+
+ @Override public String map(String type) {
+ if (type.equals(replaceType))
+ return nameType;
+
+ return type;
+ }
+ }), ClassReader.EXPAND_FRAMES);
+
+ return w.toByteArray();
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public InputStream loadClassBytes(ClassLoader ldr, String clsName) {
+ return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopLocations.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopLocations.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopLocations.java
deleted file mode 100644
index a90007f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopLocations.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-/**
- * Simple structure to hold Hadoop directory locations.
- */
-public class HadoopLocations {
- /** Hadoop home. */
- private final String home;
-
- /** Common home. */
- private final String common;
-
- /** HDFS home. */
- private final String hdfs;
-
- /** Mapred home. */
- private final String mapred;
-
- /** Whether common home exists. */
- private final boolean commonExists;
-
- /** Whether HDFS home exists. */
- private final boolean hdfsExists;
-
- /** Whether mapred home exists. */
- private final boolean mapredExists;
-
- /**
- * Constructor.
- *
- * @param home Hadoop home.
- * @param common Common home.
- * @param hdfs HDFS home.
- * @param mapred Mapred home.
- */
- public HadoopLocations(String home, String common, String hdfs, String mapred) {
- assert common != null && hdfs != null && mapred != null;
-
- this.home = home;
- this.common = common;
- this.hdfs = hdfs;
- this.mapred = mapred;
-
- commonExists = HadoopClasspathUtils.exists(common);
- hdfsExists = HadoopClasspathUtils.exists(hdfs);
- mapredExists = HadoopClasspathUtils.exists(mapred);
- }
-
- /**
- * @return Hadoop home.
- */
- public String home() {
- return home;
- }
-
- /**
- * @return Common home.
- */
- public String common() {
- return common;
- }
-
- /**
- * @return HDFS home.
- */
- public String hdfs() {
- return hdfs;
- }
-
- /**
- * @return Mapred home.
- */
- public String mapred() {
- return mapred;
- }
-
- /**
- * @return Whether common home exists.
- */
- public boolean commonExists() {
- return commonExists;
- }
-
- /**
- * @return Whether HDFS home exists.
- */
- public boolean hdfsExists() {
- return hdfsExists;
- }
-
- /**
- * @return Whether mapred home exists.
- */
- public boolean mapredExists() {
- return mapredExists;
- }
-
- /**
- * Whether all required directories exists.
- *
- * @return {@code True} if exists.
- */
- public boolean valid() {
- return commonExists && hdfsExists && mapredExists;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
deleted file mode 100644
index 4e03e17..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
-
-/**
- * Hadoop +counter group adapter.
- */
-class HadoopMapReduceCounterGroup implements CounterGroup {
- /** Counters. */
- private final HadoopMapReduceCounters cntrs;
-
- /** Group name. */
- private final String name;
-
- /**
- * Creates new instance.
- *
- * @param cntrs Client counters instance.
- * @param name Group name.
- */
- HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
- this.cntrs = cntrs;
- this.name = name;
- }
-
- /** {@inheritDoc} */
- @Override public String getName() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public String getDisplayName() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public void setDisplayName(String displayName) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void addCounter(Counter counter) {
- addCounter(counter.getName(), counter.getDisplayName(), 0);
- }
-
- /** {@inheritDoc} */
- @Override public Counter addCounter(String name, String displayName, long value) {
- final Counter counter = cntrs.findCounter(this.name, name);
-
- counter.setValue(value);
-
- return counter;
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String counterName, String displayName) {
- return cntrs.findCounter(name, counterName);
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String counterName, boolean create) {
- return cntrs.findCounter(name, counterName, create);
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String counterName) {
- return cntrs.findCounter(name, counterName);
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return cntrs.groupSize(name);
- }
-
- /** {@inheritDoc} */
- @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
- for (final Counter counter : rightGroup)
- cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
- }
-
- /** {@inheritDoc} */
- @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Counter> iterator() {
- return cntrs.iterateGroup(name);
- }
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
deleted file mode 100644
index 57a853f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.FileSystemCounter;
-import org.apache.hadoop.mapreduce.counters.AbstractCounters;
-import org.apache.hadoop.mapreduce.counters.Limits;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Counter;
-import org.apache.ignite.internal.util.typedef.T2;
-
-/**
- * Hadoop counters adapter.
- */
-public class HadoopMapReduceCounters extends Counters {
- /** */
- private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>();
-
- /**
- * Creates new instance based on given counters.
- *
- * @param cntrs Counters to adapt.
- */
- public HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters cntrs) {
- for (HadoopCounter cntr : cntrs.all())
- if (cntr instanceof HadoopLongCounter)
- this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
- return addGroup(grp.getName(), grp.getDisplayName());
- }
-
- /** {@inheritDoc} */
- @Override public CounterGroup addGroup(String name, String displayName) {
- return new HadoopMapReduceCounterGroup(this, name);
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String grpName, String cntrName) {
- return findCounter(grpName, cntrName, true);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized Counter findCounter(Enum<?> key) {
- return findCounter(key.getDeclaringClass().getName(), key.name(), true);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) {
- return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name());
- }
-
- /** {@inheritDoc} */
- @Override public synchronized Iterable<String> getGroupNames() {
- Collection<String> res = new HashSet<>();
-
- for (HadoopCounter counter : cntrs.values())
- res.add(counter.group());
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<CounterGroup> iterator() {
- final Iterator<String> iter = getGroupNames().iterator();
-
- return new Iterator<CounterGroup>() {
- @Override public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override public CounterGroup next() {
- if (!hasNext())
- throw new NoSuchElementException();
-
- return new HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next());
- }
-
- @Override public void remove() {
- throw new UnsupportedOperationException("not implemented");
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public synchronized CounterGroup getGroup(String grpName) {
- return new HadoopMapReduceCounterGroup(this, grpName);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int countCounters() {
- return cntrs.size();
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
- for (CounterGroup group : other) {
- for (Counter counter : group) {
- findCounter(group.getName(), counter.getName()).increment(counter.getValue());
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object genericRight) {
- if (!(genericRight instanceof HadoopMapReduceCounters))
- return false;
-
- return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return cntrs.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public void setWriteAllCounters(boolean snd) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean getWriteAllCounters() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public Limits limits() {
- return null;
- }
-
- /**
- * Returns size of a group.
- *
- * @param grpName Name of the group.
- * @return amount of counters in the given group.
- */
- public int groupSize(String grpName) {
- int res = 0;
-
- for (HadoopCounter counter : cntrs.values()) {
- if (grpName.equals(counter.group()))
- res++;
- }
-
- return res;
- }
-
- /**
- * Returns counters iterator for specified group.
- *
- * @param grpName Name of the group to iterate.
- * @return Counters iterator.
- */
- public Iterator<Counter> iterateGroup(String grpName) {
- Collection<Counter> grpCounters = new ArrayList<>();
-
- for (HadoopLongCounter counter : cntrs.values()) {
- if (grpName.equals(counter.group()))
- grpCounters.add(new HadoopV2Counter(counter));
- }
-
- return grpCounters.iterator();
- }
-
- /**
- * Find a counter in the group.
- *
- * @param grpName The name of the counter group.
- * @param cntrName The name of the counter.
- * @param create Create the counter if not found if true.
- * @return The counter that was found or added or {@code null} if create is false.
- */
- public Counter findCounter(String grpName, String cntrName, boolean create) {
- T2<String, String> key = new T2<>(grpName, cntrName);
-
- HadoopLongCounter internalCntr = cntrs.get(key);
-
- if (internalCntr == null & create) {
- internalCntr = new HadoopLongCounter(grpName,cntrName);
-
- cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
- }
-
- return internalCntr == null ? null : new HadoopV2Counter(internalCntr);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
index b9c20c3..f0df1e9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
@@ -40,6 +40,9 @@ import java.util.concurrent.atomic.AtomicInteger;
* Hadoop processor.
*/
public class HadoopProcessor extends HadoopProcessorAdapter {
+ /** Class to probe for Hadoop libraries in Ignite classpath. */
+ private static final String HADOOP_PROBE_CLS = "org.apache.hadoop.conf.Configuration";
+
/** Job ID counter. */
private final AtomicInteger idCtr = new AtomicInteger();
@@ -164,7 +167,14 @@ public class HadoopProcessor extends HadoopProcessorAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
- return hctx.jobTracker().submit(jobId, jobInfo);
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
+
+ try {
+ return hctx.jobTracker().submit(jobId, jobInfo);
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
}
/** {@inheritDoc} */
@@ -203,6 +213,26 @@ public class HadoopProcessor extends HadoopProcessorAdapter {
throw new IgniteCheckedException(ioe.getMessage(), ioe);
}
+ // Check if Hadoop is in parent class loader classpath.
+ try {
+ Class cls = Class.forName(HADOOP_PROBE_CLS, false, getClass().getClassLoader());
+
+ try {
+ String path = cls.getProtectionDomain().getCodeSource().getLocation().toString();
+
+ U.warn(log, "Hadoop libraries are found in Ignite classpath, this could lead to class loading " +
+ "errors (please remove all Hadoop libraries from Ignite classpath) [path=" + path + ']');
+ }
+ catch (Throwable ignore) {
+ U.warn(log, "Hadoop libraries are found in Ignite classpath, this could lead to class loading " +
+ "errors (please remove all Hadoop libraries from Ignite classpath)");
+ }
+ }
+ catch (Throwable ignore) {
+ // All is fine.
+ }
+
+ // Try assembling Hadoop URLs.
HadoopClassLoader.hadoopUrls();
}