You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/04 20:28:59 UTC
[01/18] incubator-ignite git commit: IGNITE-983: Added support for
primitive types.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-883_1 6bfc78ea6 -> 3417b3dc0
IGNITE-983: Added support for primitive types.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/11c0b904
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/11c0b904
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/11c0b904
Branch: refs/heads/ignite-883_1
Commit: 11c0b904a934c31ac936a8793cd11bf34af7634b
Parents: 97d0bc1
Author: AKuznetsov <ak...@gridgain.com>
Authored: Wed Jun 3 14:26:49 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Wed Jun 3 14:26:49 2015 +0700
----------------------------------------------------------------------
.../org/apache/ignite/configuration/CacheConfiguration.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11c0b904/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 2c7d8c1..8b1e1a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1664,7 +1664,14 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
A.ensure(indexedTypes == null || (indexedTypes.length & 1) == 0,
"Number of indexed types is expected to be even. Refer to method javadoc for details.");
- this.indexedTypes = indexedTypes;
+ int len = indexedTypes.length;
+
+ Class<?>[] newIndexedTypes = new Class<?>[len];
+
+ for (int i = 0; i < len; i++)
+ newIndexedTypes[i] = U.box(indexedTypes[i]);
+
+ this.indexedTypes = newIndexedTypes;
return this;
}
[17/18] incubator-ignite git commit: [IGNITE-218]: Wrong staging
permissions while running MR job under hadoop accelerator
Posted by sb...@apache.org.
[IGNITE-218]: Wrong staging permissions while running MR job under hadoop accelerator
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c9f72917
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c9f72917
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c9f72917
Branch: refs/heads/ignite-883_1
Commit: c9f72917843092d596044197cf7cb05c56a13fca
Parents: 20e5677
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu Jun 4 18:20:24 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu Jun 4 18:20:24 2015 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopTaskContext.java | 14 +-
.../igfs/IgfsSecondaryFileSystemImpl.java | 2 +-
.../fs/IgniteHadoopFileSystemCounterWriter.java | 14 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 70 ++---
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 2 +-
.../processors/hadoop/HadoopDefaultJobInfo.java | 2 +-
.../internal/processors/hadoop/HadoopUtils.java | 282 ++++++++++++++++++-
.../hadoop/SecondaryFileSystemProvider.java | 4 +-
.../hadoop/taskexecutor/HadoopRunnableTask.java | 20 +-
.../processors/hadoop/v2/HadoopV2Job.java | 31 +-
.../hadoop/v2/HadoopV2JobResourceManager.java | 26 +-
.../hadoop/v2/HadoopV2TaskContext.java | 48 +++-
.../hadoop/HadoopClientProtocolSelfTest.java | 6 +-
.../hadoop/HadoopAbstractSelfTest.java | 14 +-
.../hadoop/HadoopCommandLineTest.java | 14 +-
.../processors/hadoop/HadoopMapReduceTest.java | 176 +++++++++++-
.../hadoop/HadoopTaskExecutionSelfTest.java | 2 +-
.../hadoop/HadoopTasksAllVersionsTest.java | 15 +-
.../processors/hadoop/HadoopTasksV1Test.java | 5 +-
.../processors/hadoop/HadoopTasksV2Test.java | 5 +-
.../processors/hadoop/HadoopV2JobSelfTest.java | 6 +-
.../collections/HadoopAbstractMapTest.java | 12 +
22 files changed, 643 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index 371fd81..3d2ee17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -21,13 +21,14 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import java.util.*;
+import java.util.concurrent.*;
/**
* Task context.
*/
public abstract class HadoopTaskContext {
/** */
- private final HadoopJob job;
+ protected final HadoopJob job;
/** */
private HadoopTaskInput input;
@@ -187,4 +188,15 @@ public abstract class HadoopTaskContext {
* @throws IgniteCheckedException If failed.
*/
public abstract void cleanupTaskEnvironment() throws IgniteCheckedException;
+
+ /**
+ * Executes a callable on behalf of the job owner.
+ * In case of embedded task execution the implementation of this method
+ * will use classes loaded by the ClassLoader this HadoopTaskContext loaded with.
+ * @param c The callable.
+ * @param <T> The return type of the Callable.
+ * @return The result of the callable.
+ * @throws IgniteCheckedException On any error in callable.
+ */
+ public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index b8095b8..44ee90f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -121,6 +121,6 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
/** {@inheritDoc} */
@Override public void close() throws IgniteException {
- igfs.stop(true);
+ // No-op.
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 66e9761..d910507 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -20,10 +20,12 @@ package org.apache.ignite.hadoop.fs;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.*;
import java.io.*;
@@ -37,9 +39,6 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
/** */
- private static final String DEFAULT_USER_NAME = "anonymous";
-
- /** */
public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
/** */
@@ -52,15 +51,14 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
@Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs)
throws IgniteCheckedException {
- Configuration hadoopCfg = new Configuration();
+ Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
hadoopCfg.set(e.getKey(), e.getValue());
String user = jobInfo.user();
- if (F.isEmpty(user))
- user = DEFAULT_USER_NAME;
+ user = IgfsUtils.fixUserName(user);
String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
@@ -72,7 +70,9 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
try {
- FileSystem fs = jobStatPath.getFileSystem(hadoopCfg);
+ hadoopCfg.set(MRJobConfig.USER_NAME, user);
+
+ FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, true);
fs.mkdirs(jobStatPath);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index c0a9ade..9d94e5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.security.*;
import org.apache.hadoop.util.*;
import org.apache.ignite.*;
@@ -144,9 +143,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** Custom-provided sequential reads before prefetch. */
private int seqReadsBeforePrefetch;
- /** The cache was disabled when the instance was creating. */
- private boolean cacheEnabled;
-
/** {@inheritDoc} */
@Override public URI getUri() {
if (uri == null)
@@ -173,27 +169,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
}
/**
- * Gets non-null and interned user name as per the Hadoop file system viewpoint.
+ * Gets non-null user name as per the Hadoop file system viewpoint.
* @return the user name, never null.
*/
- public static String getFsHadoopUser(Configuration cfg) throws IOException {
- String user = null;
-
- // -------------------------------------------
- // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
- // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
- // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
- // ugi.doAs() closure.
- if (cfg != null)
- user = cfg.get(MRJobConfig.USER_NAME);
- // -------------------------------------------
-
- if (user == null) {
- UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
-
- if (currUgi != null)
- user = currUgi.getShortUserName();
- }
+ public static String getFsHadoopUser() throws IOException {
+ UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+
+ String user = currUgi.getShortUserName();
user = IgfsUtils.fixUserName(user);
@@ -228,10 +210,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
setConf(cfg);
- String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme());
-
- cacheEnabled = !cfg.getBoolean(disableCacheName, false);
-
mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false);
if (!IGFS_SCHEME.equals(name.getScheme()))
@@ -242,7 +220,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
uriAuthority = uri.getAuthority();
- user = getFsHadoopUser(cfg);
+ user = getFsHadoopUser();
// Override sequential reads before prefetch if needed.
seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
@@ -360,15 +338,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
@Override protected void finalize() throws Throwable {
super.finalize();
- close0();
+ close();
}
/** {@inheritDoc} */
@Override public void close() throws IOException {
- if (cacheEnabled && get(getUri(), getConf()) == this)
- return;
-
- close0();
+ if (closeGuard.compareAndSet(false, true))
+ close0();
}
/**
@@ -377,27 +353,25 @@ public class IgniteHadoopFileSystem extends FileSystem {
* @throws IOException If failed.
*/
private void close0() throws IOException {
- if (closeGuard.compareAndSet(false, true)) {
- if (LOG.isDebugEnabled())
- LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
+ if (LOG.isDebugEnabled())
+ LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
- if (rmtClient == null)
- return;
+ if (rmtClient == null)
+ return;
- super.close();
+ super.close();
- rmtClient.close(false);
+ rmtClient.close(false);
- if (clientLog.isLogEnabled())
- clientLog.close();
+ if (clientLog.isLogEnabled())
+ clientLog.close();
- if (secondaryFs != null)
- U.closeQuiet(secondaryFs);
+ if (secondaryFs != null)
+ U.closeQuiet(secondaryFs);
- // Reset initialized resources.
- uri = null;
- rmtClient = null;
- }
+ // Reset initialized resources.
+ uri = null;
+ rmtClient = null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index f3fbe9c..8330143 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -144,7 +144,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
uri = name;
- user = getFsHadoopUser(cfg);
+ user = getFsHadoopUser();
try {
initialize(name, cfg);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d0a327e..2e855d0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes.
synchronized (HadoopDefaultJobInfo.class) {
if ((jobCls0 = jobCls) == null) {
- HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main");
+ HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job");
jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index d493bd4..68a9ef6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -26,10 +26,16 @@ import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.*;
import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
import java.io.*;
+import java.net.*;
import java.util.*;
/**
@@ -57,6 +63,41 @@ public class HadoopUtils {
/** Old reducer class attribute. */
private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
+ /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+ private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+ new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+ @Override public FileSystem createValue(FsCacheKey key) {
+ try {
+ assert key != null;
+
+ // Explicitly disable FileSystem caching:
+ URI uri = key.uri();
+
+ String scheme = uri.getScheme();
+
+ // Copy the configuration to avoid altering the external object.
+ Configuration cfg = new Configuration(key.configuration());
+
+ String prop = HadoopUtils.disableFsCachePropertyName(scheme);
+
+ cfg.setBoolean(prop, true);
+
+ return FileSystem.get(uri, cfg, key.user());
+ }
+ catch (IOException | InterruptedException ioe) {
+ throw new IgniteException(ioe);
+ }
+ }
+ }
+ );
+
+ /**
+ * Constructor.
+ */
+ private HadoopUtils() {
+ // No-op.
+ }
+
/**
* Wraps native split.
*
@@ -126,8 +167,6 @@ public class HadoopUtils {
break;
case PHASE_REDUCE:
- // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers?
- // See https://issues.apache.org/jira/browse/IGNITE-764
setupProgress = 1;
mapProgress = 1;
@@ -304,9 +343,242 @@ public class HadoopUtils {
}
/**
- * Constructor.
+ * Creates {@link Configuration} in a correct class loader context to avoid caching
+ * of inappropriate class loader in the Configuration object.
+ * @return New instance of {@link Configuration}.
*/
- private HadoopUtils() {
- // No-op.
+ public static Configuration safeCreateConfiguration() {
+ final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+ Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader());
+
+ try {
+ return new Configuration();
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(cl0);
+ }
+ }
+
+ /**
+ * Creates {@link JobConf} in a correct class loader context to avoid caching
+ * of inappropriate class loader in the Configuration object.
+ * @return New instance of {@link JobConf}.
+ */
+ public static JobConf safeCreateJobConf() {
+ final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+ Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader());
+
+ try {
+ return new JobConf();
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(cl0);
+ }
+ }
+
+ /**
+ * Gets non-null user name as per the Hadoop viewpoint.
+ * @param cfg the Hadoop job configuration, may be null.
+ * @return the user name, never null.
+ */
+ private static String getMrHadoopUser(Configuration cfg) throws IOException {
+ String user = cfg.get(MRJobConfig.USER_NAME);
+
+ if (user == null)
+ user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+ return user;
+ }
+
+ /**
+ * Common method to get the V1 file system in MapRed engine.
+ * It creates the filesystem for the user specified in the
+ * configuration with {@link MRJobConfig#USER_NAME} property.
+ * @param uri the file system uri.
+ * @param cfg the configuration.
+ * @return the file system
+ * @throws IOException
+ */
+ public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException {
+ final String usr = getMrHadoopUser(cfg);
+
+ assert usr != null;
+
+ if (uri == null)
+ uri = FileSystem.getDefaultUri(cfg);
+
+ final FileSystem fs;
+
+ if (doCacheFs) {
+ try {
+ fs = getWithCaching(uri, cfg, usr);
+ }
+ catch (IgniteException ie) {
+ throw new IOException(ie);
+ }
+ }
+ else {
+ try {
+ fs = FileSystem.get(uri, cfg, usr);
+ }
+ catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException(ie);
+ }
+ }
+
+ assert fs != null;
+ assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+ return fs;
+ }
+
+ /**
+ * Note that configuration is not a part of the key.
+ * It is used solely to initialize the first instance
+ * that is created for the key.
+ */
+ public static final class FsCacheKey {
+ /** */
+ private final URI uri;
+
+ /** */
+ private final String usr;
+
+ /** */
+ private final String equalityKey;
+
+ /** */
+ private final Configuration cfg;
+
+ /**
+ * Constructor
+ */
+ public FsCacheKey(URI uri, String usr, Configuration cfg) {
+ assert uri != null;
+ assert usr != null;
+ assert cfg != null;
+
+ this.uri = fixUri(uri, cfg);
+ this.usr = usr;
+ this.cfg = cfg;
+
+ this.equalityKey = createEqualityKey();
+ }
+
+ /**
+ * Creates String key used for equality and hashing.
+ */
+ private String createEqualityKey() {
+ GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
+
+ if (uri.getScheme() != null)
+ sb.a(uri.getScheme().toLowerCase());
+
+ sb.a("://");
+
+ if (uri.getAuthority() != null)
+ sb.a(uri.getAuthority().toLowerCase());
+
+ return sb.toString();
+ }
+
+ /**
+ * The URI.
+ */
+ public URI uri() {
+ return uri;
+ }
+
+ /**
+ * The User.
+ */
+ public String user() {
+ return usr;
+ }
+
+ /**
+ * The Configuration.
+ */
+ public Configuration configuration() {
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+
+ return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return equalityKey.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return equalityKey;
+ }
+ }
+
+ /**
+ * Gets FileSystem caching it in static Ignite cache. The cache is a singleton
+ * for each class loader.
+ *
+ * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}.
+ * The Configuration is not a part of the key. This means that for the given key file system is
+ * initialized only once with the Configuration passed in upon the file system creation.
+ *
+ * @param uri The file system URI.
+ * @param cfg The configuration.
+ * @param usr The user to create file system for.
+ * @return The file system: either created, or taken from the cache.
+ */
+ private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) {
+ FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+ return fileSysLazyMap.getOrCreate(key);
+ }
+
+ /**
+ * Gets the property name to disable file system cache.
+ * @param scheme The file system URI scheme.
+ * @return The property name. If scheme is null,
+ * returns "fs.null.impl.disable.cache".
+ */
+ public static String disableFsCachePropertyName(@Nullable String scheme) {
+ return String.format("fs.%s.impl.disable.cache", scheme);
+ }
+
+ /**
+ * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+ * @param uri0 The uri.
+ * @param cfg The cfg.
+ * @return Correct URI.
+ */
+ public static URI fixUri(URI uri0, Configuration cfg) {
+ if (uri0 == null)
+ return FileSystem.getDefaultUri(cfg);
+
+ String scheme = uri0.getScheme();
+ String authority = uri0.getAuthority();
+
+ if (authority == null) {
+ URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+ if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
+ return dfltUri;
+ }
+
+ return uri0;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index b1a057c..dd679de 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -34,7 +34,7 @@ import java.security.*;
*/
public class SecondaryFileSystemProvider {
/** Configuration of the secondary filesystem, never null. */
- private final Configuration cfg = new Configuration();
+ private final Configuration cfg = HadoopUtils.safeCreateConfiguration();
/** The secondary filesystem URI, never null. */
private final URI uri;
@@ -76,7 +76,7 @@ public class SecondaryFileSystemProvider {
}
// Disable caching:
- String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme());
+ String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme());
cfg.setBoolean(prop, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index 2e04ac1..b170125 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -99,6 +99,22 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
/** {@inheritDoc} */
@Override public Void call() throws IgniteCheckedException {
+ ctx = job.getTaskContext(info);
+
+ return ctx.runAsJobOwner(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ call0();
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Implements actual task running.
+ * @throws IgniteCheckedException
+ */
+ void call0() throws IgniteCheckedException {
execStartTs = U.currentTimeMillis();
Throwable err = null;
@@ -108,8 +124,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
HadoopPerformanceCounter perfCntr = null;
try {
- ctx = job.getTaskContext(info);
-
perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
perfCntr.onTaskSubmit(info, submitTs);
@@ -156,8 +170,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
if (ctx != null)
ctx.cleanupTaskEnvironment();
}
-
- return null;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index d265ca8..d754039 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.hadoop.v2;
import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.JobID;
@@ -68,7 +67,7 @@ public class HadoopV2Job implements HadoopJob {
new ConcurrentHashMap8<>();
/** Pooling task context class and thus class loading environment. */
- private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
+ private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
/** All created contexts. */
private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>();
@@ -93,12 +92,7 @@ public class HadoopV2Job implements HadoopJob {
hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
- HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader();
-
- // Before create JobConf instance we should set new context class loader.
- Thread.currentThread().setContextClassLoader(clsLdr);
-
- jobConf = new JobConf();
+ jobConf = HadoopUtils.safeCreateJobConf();
HadoopFileSystemsUtils.setupFileSystems(jobConf);
@@ -139,7 +133,9 @@ public class HadoopV2Job implements HadoopJob {
Path jobDir = new Path(jobDirPath);
- try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) {
+ try {
+ FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, true);
+
JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
jobDir);
@@ -197,7 +193,7 @@ public class HadoopV2Job implements HadoopJob {
if (old != null)
return old.get();
- Class<?> cls = taskCtxClsPool.poll();
+ Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll();
try {
if (cls == null) {
@@ -205,9 +201,9 @@ public class HadoopV2Job implements HadoopJob {
// Note that the classloader identified by the task it was initially created for,
// but later it may be reused for other tasks.
HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(),
- "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
+ "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
- cls = ldr.loadClass(HadoopV2TaskContext.class.getName());
+ cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
fullCtxClsQueue.add(cls);
}
@@ -325,7 +321,14 @@ public class HadoopV2Job implements HadoopJob {
/** {@inheritDoc} */
@Override public void cleanupStagingDirectory() {
- if (rsrcMgr != null)
- rsrcMgr.cleanupStagingDirectory();
+ rsrcMgr.cleanupStagingDirectory();
+ }
+
+ /**
+ * Getter for job configuration.
+ * @return The job configuration.
+ */
+ public JobConf jobConf() {
+ return jobConf;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 6f6bfa1..2f64e77 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -40,6 +40,9 @@ import java.util.*;
* files are needed to be placed on local files system.
*/
public class HadoopV2JobResourceManager {
+ /** File type Fs disable caching property name. */
+ private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopUtils.disableFsCachePropertyName("file");
+
/** Hadoop job context. */
private final JobContextImpl ctx;
@@ -84,7 +87,7 @@ public class HadoopV2JobResourceManager {
try {
cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
- if(!cfg.getBoolean("fs.file.impl.disable.cache", false))
+ if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false))
FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
}
finally {
@@ -112,15 +115,17 @@ public class HadoopV2JobResourceManager {
stagingDir = new Path(new URI(mrDir));
if (download) {
- FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg);
+ FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true);
if (!fs.exists(stagingDir))
- throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " +
- stagingDir);
+ throw new IgniteCheckedException("Failed to find map-reduce submission " +
+ "directory (does not exist): " + stagingDir);
if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
- throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " +
- "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+ throw new IgniteCheckedException("Failed to copy job submission directory "
+ + "contents to local file system "
+ + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
+ + ", jobId=" + jobId + ']');
}
File jarJobFile = new File(jobLocDir, "job.jar");
@@ -144,7 +149,8 @@ public class HadoopV2JobResourceManager {
}
}
else if (!jobLocDir.mkdirs())
- throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath());
+ throw new IgniteCheckedException("Failed to create local job directory: "
+ + jobLocDir.getAbsolutePath());
setLocalFSWorkingDirectory(jobLocDir);
}
@@ -204,14 +210,14 @@ public class HadoopV2JobResourceManager {
FileSystem dstFs = FileSystem.getLocal(cfg);
- FileSystem srcFs = srcPath.getFileSystem(cfg);
+ FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true);
if (extract) {
File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
if (!archivesPath.exists() && !archivesPath.mkdir())
throw new IOException("Failed to create directory " +
- "[path=" + archivesPath + ", jobId=" + jobId + ']');
+ "[path=" + archivesPath + ", jobId=" + jobId + ']');
File archiveFile = new File(archivesPath, locName);
@@ -287,7 +293,7 @@ public class HadoopV2JobResourceManager {
public void cleanupStagingDirectory() {
try {
if (stagingDir != null)
- stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true);
+ HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), true).delete(stagingDir, true);
}
catch (Exception e) {
log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index dd18c66..e89feba 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -28,17 +28,21 @@ import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
import org.apache.ignite.internal.processors.hadoop.v1.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import java.io.*;
+import java.security.*;
import java.util.*;
+import java.util.concurrent.*;
import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
@@ -419,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
- try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf());
+ try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), false);
FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
in.seek(split.offset());
@@ -448,4 +452,44 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
throw new IgniteCheckedException(e);
}
}
+
+ /** {@inheritDoc} */
+ @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
+ String user = job.info().user();
+
+ user = IgfsUtils.fixUserName(user);
+
+ assert user != null;
+
+ String ugiUser;
+
+ try {
+ UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+
+ assert currUser != null;
+
+ ugiUser = currUser.getShortUserName();
+ }
+ catch (IOException ioe) {
+ throw new IgniteCheckedException(ioe);
+ }
+
+ try {
+ if (F.eq(user, ugiUser))
+ // if current UGI context user is the same, do direct call:
+ return c.call();
+ else {
+ UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+ return ugi.doAs(new PrivilegedExceptionAction<T>() {
+ @Override public T run() throws Exception {
+ return c.call();
+ }
+ });
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
index b94d9d1..b9f8179 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.*;
import org.apache.ignite.hadoop.mapreduce.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.proto.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
* @return Configuration.
*/
private Configuration config(int port) {
- Configuration conf = new Configuration();
+ Configuration conf = HadoopUtils.safeCreateConfiguration();
setupFileSystems(conf);
@@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
ctx.getCounter(TestCounter.COUNTER2).increment(1);
int sum = 0;
- for (IntWritable value : values) {
+ for (IntWritable value : values)
sum += value.get();
- }
ctx.write(key, new IntWritable(sum));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index af1a1e1..e8a0a6f 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -62,6 +61,17 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
/** Initial REST port. */
private int restPort = REST_PORT;
+ /** Secondary file system REST endpoint configuration. */
+ protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG;
+
+ static {
+ SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+
+ SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+ SECONDARY_REST_CFG.setPort(11500);
+ }
+
+
/** Initial classpath. */
private static String initCp;
@@ -133,7 +143,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
/**
* @return IGFS configuration.
*/
- public FileSystemConfiguration igfsConfiguration() {
+ public FileSystemConfiguration igfsConfiguration() throws Exception {
FileSystemConfiguration cfg = new FileSystemConfiguration();
cfg.setName(igfsName);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
index d10ee5c..c66cdf3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
@@ -19,12 +19,16 @@ package org.apache.ignite.internal.processors.hadoop;
import com.google.common.base.*;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.hadoop.fs.*;
import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.resource.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.testframework.junits.common.*;
import org.jsr166.*;
@@ -205,7 +209,15 @@ public class HadoopCommandLineTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName);
+ String cfgPath = "config/hadoop/default-config.xml";
+
+ IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath);
+
+ IgniteConfiguration cfg = tup.get1();
+
+ cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes.
+
+ igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 8a3a0ac..a1ef7ba 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -24,31 +24,104 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.hadoop.fs.*;
import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.secondary.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
import java.io.*;
import java.util.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
/**
* Test of whole cycle of map-reduce processing via Job tracker.
*/
public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
+ /** IGFS block size. */
+ protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
+
+ /** Amount of blocks to prefetch. */
+ protected static final int PREFETCH_BLOCKS = 1;
+
+ /** Amount of sequential block reads before prefetch is triggered. */
+ protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
+
+ /** Secondary file system URI. */
+ protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
+
+ /** Secondary file system configuration path. */
+ protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
+
+ /** The user to run Hadoop job on behalf of. */
+ protected static final String USER = "vasya";
+
+ /** Secondary IGFS name. */
+ protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
+
+ /** The secondary Ignite node. */
+ protected Ignite igniteSecondary;
+
+ /** The secondary Fs. */
+ protected IgfsSecondaryFileSystem secondaryFs;
+
/** {@inheritDoc} */
@Override protected int gridCount() {
return 3;
}
/**
+ * Gets owner of a IgfsEx path.
+ * @param p The path.
+ * @return The owner.
+ */
+ private static String getOwner(IgfsEx i, IgfsPath p) {
+ return i.info(p).property(IgfsEx.PROP_USER_NAME);
+ }
+
+ /**
+ * Gets owner of a secondary Fs path.
+ * @param secFs The sec Fs.
+ * @param p The path.
+ * @return The owner.
+ */
+ private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
+ return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
+ @Override public String apply() {
+ return secFs.info(p).property(IgfsEx.PROP_USER_NAME);
+ }
+ });
+ }
+
+ /**
+ * Checks owner of the path.
+ * @param p The path.
+ */
+ private void checkOwner(IgfsPath p) {
+ String ownerPrim = getOwner(igfs, p);
+ assertEquals(USER, ownerPrim);
+
+ String ownerSec = getOwnerSecondary(secondaryFs, p);
+ assertEquals(USER, ownerSec);
+ }
+
+ /**
* Tests whole job execution with all phases in all combination of new and old versions of API.
* @throws Exception If fails.
*/
@@ -71,7 +144,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
JobConf jobConf = new JobConf();
jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
- jobConf.setUser("yyy");
+ jobConf.setUser(USER);
jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
//To split into about 40 items for v2
@@ -105,14 +178,20 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
checkJobStatistics(jobId);
+ final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
+
+ checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
+
+ checkOwner(new IgfsPath(outFile));
+
assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
- useNewReducer,
+ useNewReducer,
"blue\t200000\n" +
- "green\t150000\n" +
- "red\t100000\n" +
- "yellow\t70000\n",
- readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000")
- );
+ "green\t150000\n" +
+ "red\t100000\n" +
+ "yellow\t70000\n",
+ readAndSortFile(outFile)
+ );
}
}
@@ -182,7 +261,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
}
}
- final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance");
+ final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
@@ -212,4 +291,85 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']';
}
}
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG);
+
+ super.beforeTest();
+ }
+
+ /**
+ * Start grid with IGFS.
+ *
+ * @param gridName Grid name.
+ * @param igfsName IGFS name
+ * @param mode IGFS mode.
+ * @param secondaryFs Secondary file system (optional).
+ * @param restCfg Rest configuration string (optional).
+ * @return Started grid instance.
+ * @throws Exception If failed.
+ */
+ protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
+ @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("dataCache");
+ igfsCfg.setMetaCacheName("metaCache");
+ igfsCfg.setName(igfsName);
+ igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
+ igfsCfg.setDefaultMode(mode);
+ igfsCfg.setIpcEndpointConfiguration(restCfg);
+ igfsCfg.setSecondaryFileSystem(secondaryFs);
+ igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
+ igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+
+ CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+ dataCacheCfg.setName("dataCache");
+ dataCacheCfg.setCacheMode(PARTITIONED);
+ dataCacheCfg.setNearConfiguration(null);
+ dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+ dataCacheCfg.setBackups(0);
+ dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+ dataCacheCfg.setOffHeapMaxMemory(0);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("metaCache");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setGridName(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+ cfg.setFileSystemConfiguration(igfsCfg);
+
+ cfg.setLocalHost("127.0.0.1");
+ cfg.setConnectorConfiguration(null);
+
+ return G.start(cfg);
+ }
+
+ /**
+ * @return IGFS configuration.
+ */
+ @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
+ FileSystemConfiguration fsCfg = super.igfsConfiguration();
+
+ secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
+
+ fsCfg.setSecondaryFileSystem(secondaryFs);
+
+ return fsCfg;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
index 8dc9830..eee5c8b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
@@ -72,7 +72,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
/** {@inheritDoc} */
- @Override public FileSystemConfiguration igfsConfiguration() {
+ @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
FileSystemConfiguration cfg = super.igfsConfiguration();
cfg.setFragmentizerEnabled(false);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
index aaf0f92..6930020 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.io.*;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import java.io.*;
import java.net.*;
@@ -43,7 +42,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @return Hadoop job.
* @throws IOException If fails.
*/
- public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception;
+ public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception;
/**
* @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
@@ -79,7 +78,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
igfs.info(inFile).length() - fileBlock1.length());
- HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
+ HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
@@ -110,7 +109,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @return Context with mock output.
* @throws IgniteCheckedException If fails.
*/
- private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType,
+ private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType,
int taskNum, String... words) throws IgniteCheckedException {
HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
@@ -136,7 +135,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @throws Exception If fails.
*/
public void testReduceTask() throws Exception {
- HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
+ HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
@@ -162,7 +161,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @throws Exception If fails.
*/
public void testCombinerTask() throws Exception {
- HadoopV2Job gridJob = getHadoopJob("/", "/");
+ HadoopJob gridJob = getHadoopJob("/", "/");
HadoopTestTaskContext ctx =
runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
@@ -182,7 +181,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @return Context of combine task with mock output.
* @throws IgniteCheckedException If fails.
*/
- private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob)
+ private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob)
throws IgniteCheckedException {
HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
@@ -228,7 +227,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l);
HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
- HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
+ HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
index b41a260..48e83cc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop;
import org.apache.hadoop.mapred.*;
import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import java.io.*;
import java.util.*;
@@ -38,7 +37,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
* @return Hadoop job.
* @throws IOException If fails.
*/
- @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+ @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile);
setupFileSystems(jobConf);
@@ -47,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
- return new HadoopV2Job(jobId, jobInfo, log);
+ return jobInfo.createJob(jobId, log);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index b677c63..e73fae3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import java.util.*;
@@ -42,7 +41,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
* @return Hadoop job.
* @throws Exception if fails.
*/
- @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+ @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
Job job = Job.getInstance();
job.setOutputKeyClass(Text.class);
@@ -65,7 +64,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
- return new HadoopV2Job(jobId, jobInfo, log);
+ return jobInfo.createJob(jobId, log);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
index ebc89f4..f3b9307 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -66,7 +66,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
cfg.setMapOutputValueClass(Text.class);
cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
- HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
+ HadoopDefaultJobInfo info = createJobInfo(cfg);
+
+ HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1);
+
+ HadoopJob job = info.createJob(id, log);
HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
null));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
index b4ed5e1..9395c5e 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.testframework.junits.common.*;
import org.jetbrains.annotations.*;
import java.util.*;
+import java.util.concurrent.*;
/**
* Abstract class for maps test.
@@ -95,9 +96,20 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
assert false;
}
+ /** {@inheritDoc} */
@Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
assert false;
}
+
+ /** {@inheritDoc} */
+ @Override public <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException {
+ try {
+ return c.call();
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
}
/**
[14/18] incubator-ignite git commit: # ignite-981
Posted by sb...@apache.org.
# ignite-981
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a6ea325c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a6ea325c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a6ea325c
Branch: refs/heads/ignite-883_1
Commit: a6ea325cf751cd9197eb80230cedc76a6a30daa4
Parents: ddcb2a3
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 13:41:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 13:41:59 2015 +0300
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsClientCacheSelfTest.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6ea325c/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
index 02166c4..9cda1b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
@@ -85,13 +85,16 @@ public class IgfsClientCacheSelfTest extends IgfsAbstractSelfTest {
cfg.setCacheConfiguration(cacheConfiguration(META_CACHE_NAME), cacheConfiguration(DATA_CACHE_NAME),
cacheConfiguration(CACHE_NAME));
- if (!gridName.equals(getTestGridName(0)))
- cfg.setClientMode(true);
-
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(IP_FINDER);
+ if (!gridName.equals(getTestGridName(0))) {
+ cfg.setClientMode(true);
+
+ disco.setForceServerMode(true);
+ }
+
cfg.setDiscoverySpi(disco);
FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
[08/18] incubator-ignite git commit: # ignite-sprint-5
Posted by sb...@apache.org.
# ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7501025d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7501025d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7501025d
Branch: refs/heads/ignite-883_1
Commit: 7501025d815a61ef881d86fa326bc6e17064ec0e
Parents: ae5189a
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 11:01:15 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 11:01:15 2015 +0300
----------------------------------------------------------------------
.../src/main/java/org/apache/ignite/internal/IgniteKernal.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7501025d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8a7dc70..a0d97c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -963,8 +963,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
sysPoolQSize = exec.getQueue().size();
}
+ String id = U.id8(localNode().id());
+
String msg = NL +
"Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
+ " ^-- Node [id=" + id + ", name=" + name() + "]" + NL +
" ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
" ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +
[12/18] incubator-ignite git commit: # IGNITE-983. Minor fix after
review.
Posted by sb...@apache.org.
# IGNITE-983. Minor fix after review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bf3203a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bf3203a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bf3203a4
Branch: refs/heads/ignite-883_1
Commit: bf3203a42fbd92e5960c29f672351d20cd756897
Parents: 46b2447
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:50:16 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:50:16 2015 +0700
----------------------------------------------------------------------
...gniteCacheConfigurationPrimitiveTypesSelfTest.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf3203a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
index 967a466..e90f10c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
@@ -93,12 +93,12 @@ public class IgniteCacheConfigurationPrimitiveTypesSelfTest extends GridCommonAb
assert cache.query(new ScanQuery<>()).getAll().size() == 7;
- assert cache.query(new SqlQuery<>(Byte.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Short.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Integer.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Long.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Float.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Double.class, "1 = 1")).getAll().size() == 1;
- assert cache.query(new SqlQuery<>(Boolean.class, "1 = 1")).getAll().size() == 1;
+ assertEquals(cache.query(new SqlQuery<>(Byte.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Short.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Integer.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Long.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Float.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Double.class, "1 = 1")).getAll().size(), 1);
+ assertEquals(cache.query(new SqlQuery<>(Boolean.class, "1 = 1")).getAll().size(), 1);
}
}
[09/18] incubator-ignite git commit: Merge branch 'ignite-988' into
ignite-sprint-5
Posted by sb...@apache.org.
Merge branch 'ignite-988' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e625709b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e625709b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e625709b
Branch: refs/heads/ignite-883_1
Commit: e625709b2f74853ef883df6cafa46b8a2b0245f7
Parents: ae5189a 46eab5b
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:06:25 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:06:25 2015 +0700
----------------------------------------------------------------------
.../java/org/apache/ignite/internal/visor/query/VisorQueryJob.java | 2 +-
.../apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[04/18] incubator-ignite git commit: IGNITE-991 - Fix cache start
from client node config.
Posted by sb...@apache.org.
IGNITE-991 - Fix cache start from client node config.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1de11fff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1de11fff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1de11fff
Branch: refs/heads/ignite-883_1
Commit: 1de11fff3cea7883a2f28a35107d7c9dfc75d5e0
Parents: 97d0bc1
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Jun 3 16:34:12 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Jun 3 16:34:12 2015 -0700
----------------------------------------------------------------------
.../dht/GridClientPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopologyImpl.java | 6 +-
.../GridDhtPartitionsExchangeFuture.java | 18 +++-
...niteDynamicCacheWithConfigStartSelfTest.java | 98 ++++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 1 +
5 files changed, 116 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 2049d03..c3f3e7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -220,7 +220,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
// If this is the oldest node.
- if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId)) {
+ if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1ae4ae7..af121c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -249,7 +249,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
// If this is the oldest node.
- if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId())) {
+ if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
@@ -276,7 +276,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (cctx.rebalanceEnabled()) {
for (int p = 0; p < num; p++) {
// If this is the first node in grid.
- boolean added = exchFut.isCacheAdded(cctx.cacheId());
+ boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) {
assert exchId.isJoined() || added;
@@ -668,7 +668,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
- ", allIds=" + allIds + ", node2part=" + node2part + ']';
+ ", allIds=" + allIds + ", node2part=" + node2part + ", cache=" + cctx.name() + ']';
Collection<UUID> nodeIds = part2node.get(p);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index db43c6c..a03e2e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -295,7 +295,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @param cacheId Cache ID to check.
* @return {@code True} if cache was added during this exchange.
*/
- public boolean isCacheAdded(int cacheId) {
+ public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
if (!F.isEmpty(reqs)) {
for (DynamicCacheChangeRequest req : reqs) {
if (req.start() && !req.clientStartOnly()) {
@@ -305,7 +305,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
- return false;
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
}
/**
@@ -505,11 +507,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (cacheCtx.isLocal())
continue;
- cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
-
GridDhtPartitionTopology top = cacheCtx.topology();
top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+
+ if (cacheCtx.affinity().affinityTopologyVersion() == AffinityTopologyVersion.NONE) {
+ initTopology(cacheCtx);
+
+ top.beforeExchange(this);
+ }
+ else
+ cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
}
if (exchId.isLeft())
@@ -566,7 +574,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert oldestNode.get() != null;
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (isCacheAdded(cacheCtx.cacheId())) {
+ if (isCacheAdded(cacheCtx.cacheId(), exchId.topologyVersion())) {
if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty())
U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
new file mode 100644
index 0000000..dcd6a69
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String CACHE_NAME = "partitioned";
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ if (client)
+ cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration cacheConfiguration(String cacheName) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+ ccfg.setIndexedTypes(String.class, String.class);
+
+ return ccfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartCacheOnClient() throws Exception {
+ int srvCnt = 3;
+
+ startGrids(srvCnt);
+
+ try {
+ client = true;
+
+ IgniteEx client = startGrid(srvCnt);
+
+ for (int i = 0; i < 100; i++)
+ client.cache(CACHE_NAME).put(i, i);
+
+ for (int i = 0; i < 100; i++)
+ assertEquals(i, grid(0).cache(CACHE_NAME).get(i));
+
+ client.cache(CACHE_NAME).removeAll();
+
+ for (int i = 0; i < 100; i++)
+ assertNull(grid(0).cache(CACHE_NAME).get(i));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index a8019d2..15756d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -99,6 +99,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheTxPreloadNoWriteTest.class);
suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class);
+ suite.addTestSuite(IgniteDynamicCacheWithConfigStartSelfTest.class);
suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class);
suite.addTestSuite(IgniteCacheConfigurationTemplateTest.class);
suite.addTestSuite(IgniteCacheConfigurationDefaultTemplateTest.class);
[11/18] incubator-ignite git commit: Merge branches 'ignite-983' and
'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite
into ignite-983
Posted by sb...@apache.org.
Merge branches 'ignite-983' and 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-983
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/46b24472
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/46b24472
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/46b24472
Branch: refs/heads/ignite-883_1
Commit: 46b244723a0c39c7bb5bb92157d71032d655923a
Parents: 51d4737 922c1c4
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:29:43 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:29:43 2015 +0700
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 3 +
.../processors/cache/GridCacheContext.java | 3 -
.../dht/GridDhtPartitionTopologyImpl.java | 8 +-
.../GridDhtPartitionsExchangeFuture.java | 10 +-
.../internal/visor/query/VisorQueryJob.java | 2 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 3 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 31 -----
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 56 +++++++-
.../tcp/ipfinder/TcpDiscoveryIpFinder.java | 10 +-
.../TcpDiscoveryMulticastIpFinder.java | 47 +++++--
.../cache/IgniteDynamicCacheStartSelfTest.java | 62 +++++++++
.../tcp/TcpClientDiscoverySpiMulticastTest.java | 129 +++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 1 +
.../gce/TcpDiscoveryGoogleStorageIpFinder.java | 43 ++++---
.../commands/cache/VisorCacheScanCommand.scala | 2 +-
15 files changed, 326 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
[13/18] incubator-ignite git commit: # ignite-981 fixed wait for
cache initialization on clients
Posted by sb...@apache.org.
# ignite-981 fixed wait for cache initialization on clients
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ddcb2a3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ddcb2a3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ddcb2a3f
Branch: refs/heads/ignite-883_1
Commit: ddcb2a3f6932fe8d3f86d3e1c16a3c4a4610959f
Parents: 1603fe5
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:25:42 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 11:50:36 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 117 +++++++------
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../IgniteMessagingWithClientTest.java | 164 +++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
4 files changed, 232 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6e8d457..4382731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1722,68 +1722,83 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
- Object msgBody = ioMsg.body();
-
- assert msgBody != null || ioMsg.bodyBytes() != null;
+ busyLock.readLock();
try {
- byte[] msgTopicBytes = ioMsg.topicBytes();
-
- Object msgTopic = ioMsg.topic();
-
- GridDeployment dep = ioMsg.deployment();
-
- if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
- ioMsg.deploymentClassName() != null) {
- dep = ctx.deploy().getGlobalDeployment(
- ioMsg.deploymentMode(),
- ioMsg.deploymentClassName(),
- ioMsg.deploymentClassName(),
- ioMsg.userVersion(),
- nodeId,
- ioMsg.classLoaderId(),
- ioMsg.loaderParticipants(),
- null);
-
- if (dep == null)
- throw new IgniteDeploymentCheckedException(
- "Failed to obtain deployment information for user message. " +
- "If you are using custom message or topic class, try implementing " +
- "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
-
- ioMsg.deployment(dep); // Cache deployment.
+ if (stopping) {
+ if (log.isDebugEnabled())
+ log.debug("Received user message while stopping (will ignore) [nodeId=" +
+ nodeId + ", msg=" + msg + ']');
+
+ return;
}
- // Unmarshall message topic if needed.
- if (msgTopic == null && msgTopicBytes != null) {
- msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
+ Object msgBody = ioMsg.body();
- ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
- }
+ assert msgBody != null || ioMsg.bodyBytes() != null;
- if (!F.eq(topic, msgTopic))
- return;
+ try {
+ byte[] msgTopicBytes = ioMsg.topicBytes();
+
+ Object msgTopic = ioMsg.topic();
+
+ GridDeployment dep = ioMsg.deployment();
+
+ if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
+ ioMsg.deploymentClassName() != null) {
+ dep = ctx.deploy().getGlobalDeployment(
+ ioMsg.deploymentMode(),
+ ioMsg.deploymentClassName(),
+ ioMsg.deploymentClassName(),
+ ioMsg.userVersion(),
+ nodeId,
+ ioMsg.classLoaderId(),
+ ioMsg.loaderParticipants(),
+ null);
+
+ if (dep == null)
+ throw new IgniteDeploymentCheckedException(
+ "Failed to obtain deployment information for user message. " +
+ "If you are using custom message or topic class, try implementing " +
+ "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
+
+ ioMsg.deployment(dep); // Cache deployment.
+ }
- if (msgBody == null) {
- msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+ // Unmarshall message topic if needed.
+ if (msgTopic == null && msgTopicBytes != null) {
+ msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
- ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
- }
+ ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
+ }
- // Resource injection.
- if (dep != null)
- ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
- msg + ']', e);
- }
+ if (!F.eq(topic, msgTopic))
+ return;
+
+ if (msgBody == null) {
+ msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+
+ ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
+ }
- if (msgBody != null) {
- if (predLsnr != null) {
- if (!predLsnr.apply(nodeId, msgBody))
- removeMessageListener(TOPIC_COMM_USER, this);
+ // Resource injection.
+ if (dep != null)
+ ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
}
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
+ msg + ']', e);
+ }
+
+ if (msgBody != null) {
+ if (predLsnr != null) {
+ if (!predLsnr.apply(nodeId, msgBody))
+ removeMessageListener(TOPIC_COMM_USER, this);
+ }
+ }
+ }
+ finally {
+ busyLock.readUnlock();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 1aef18c..51010ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -274,7 +274,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
- return demandPool.syncFuture();
+ return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
new file mode 100644
index 0000000..855a4f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.messaging;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteMessagingWithClientTest extends GridCommonAbstractTest implements Serializable {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Message topic. */
+ private enum TOPIC {
+ /** */
+ ORDERED
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(new OptimizedMarshaller(false));
+
+ if (gridName.equals(getTestGridName(2))) {
+ cfg.setClientMode(true);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+ }
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMessageSendWithClientJoin() throws Exception {
+ startGrid(0);
+
+ Ignite ignite1 = startGrid(1);
+
+ ClusterGroup rmts = ignite1.cluster().forRemotes();
+
+ IgniteMessaging msg = ignite1.message(rmts);
+
+ msg.localListen(TOPIC.ORDERED, new LocalListener());
+
+ msg.remoteListen(TOPIC.ORDERED, new RemoteListener());
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int iter = 0;
+
+ while (!stop.get()) {
+ if (iter % 10 == 0)
+ log.info("Client start/stop iteration: " + iter);
+
+ iter++;
+
+ try (Ignite ignite = startGrid(2)) {
+ assertTrue(ignite.configuration().isClientMode());
+ }
+ }
+
+ return null;
+ }
+ }, 1, "client-start-stop");
+
+ try {
+ long stopTime = U.currentTimeMillis() + 30_000;
+
+ int iter = 0;
+
+ while (System.currentTimeMillis() < stopTime) {
+ try {
+ ignite1.message(rmts).sendOrdered(TOPIC.ORDERED, Integer.toString(iter), 0);
+ }
+ catch (IgniteException e) {
+ log.info("Message send failed: " + e);
+ }
+
+ iter++;
+
+ if (iter % 100 == 0)
+ Thread.sleep(5);
+ }
+ }
+ finally {
+ stop.set(true);
+ }
+
+ fut.get();
+ }
+
+ /**
+ *
+ */
+ private static class LocalListener implements IgniteBiPredicate<UUID, String> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, String s) {
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoteListener implements IgniteBiPredicate<UUID, String> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID nodeId, String msg) {
+ ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.ORDERED, msg);
+
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 9eb31f1..e0a1e6e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -53,6 +53,7 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridSelfTest.class));
suite.addTest(new TestSuite(GridProjectionSelfTest.class));
suite.addTest(new TestSuite(GridMessagingSelfTest.class));
+ suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class));
suite.addTest(new TestSuite(GridMessagingNoPeerClassLoadingSelfTest.class));
if (U.isLinux() || U.isMacOs())
[06/18] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-5' into ignite-991
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-991
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/38c084a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/38c084a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/38c084a8
Branch: refs/heads/ignite-883_1
Commit: 38c084a81850d26e336382822574004b79fce935
Parents: 1de11ff bd3abbc
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:27:03 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 09:27:03 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/processors/cache/GridCacheContext.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
[10/18] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-sprint-5' into ignite-sprint-5
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/922c1c44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/922c1c44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/922c1c44
Branch: refs/heads/ignite-883_1
Commit: 922c1c445b2519dbea1c7cb68145c05b2c063d41
Parents: e625709 7501025
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 15:14:20 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 15:14:20 2015 +0700
----------------------------------------------------------------------
.../src/main/java/org/apache/ignite/internal/IgniteKernal.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
[02/18] incubator-ignite git commit: IGNITE-983: Added tests.
Posted by sb...@apache.org.
IGNITE-983: Added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bafad996
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bafad996
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bafad996
Branch: refs/heads/ignite-883_1
Commit: bafad99677ff63431adfc9ca9c3e3a3897447c25
Parents: 11c0b90
Author: AKuznetsov <ak...@gridgain.com>
Authored: Wed Jun 3 15:25:49 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Wed Jun 3 15:25:49 2015 +0700
----------------------------------------------------------------------
...acheConfigurationPrimitiveTypesSelfTest.java | 104 +++++++++++++++++++
.../IgniteCacheWithIndexingTestSuite.java | 2 +
2 files changed, 106 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bafad996/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
new file mode 100644
index 0000000..967a466
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationPrimitiveTypesSelfTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCacheConfigurationPrimitiveTypesSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimitiveTypes() throws Exception {
+ Ignite ignite = startGrid(1);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>("c1");
+
+ ccfg.setIndexedTypes(
+ byte.class, byte.class,
+ short.class, short.class,
+ int.class, int.class,
+ long.class, long.class,
+ float.class, float.class,
+ double.class, double.class,
+ boolean.class, boolean.class);
+
+ IgniteCache<Object, Object> cache = ignite.getOrCreateCache(ccfg);
+
+ byte b = 1;
+ cache.put(b, b);
+
+ short s = 2;
+ cache.put(s, s);
+
+ int i = 3;
+ cache.put(i, i);
+
+ long l = 4;
+ cache.put(l, l);
+
+ float f = 5;
+ cache.put(f, f);
+
+ double d = 6;
+ cache.put(d, d);
+
+ boolean bool = true;
+ cache.put(bool, bool);
+
+ assert cache.query(new ScanQuery<>()).getAll().size() == 7;
+
+ assert cache.query(new SqlQuery<>(Byte.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Short.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Integer.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Long.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Float.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Double.class, "1 = 1")).getAll().size() == 1;
+ assert cache.query(new SqlQuery<>(Boolean.class, "1 = 1")).getAll().size() == 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bafad996/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index 240caff..67ebda9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -51,6 +51,8 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
suite.addTestSuite(CacheConfigurationP2PTest.class);
+ suite.addTestSuite(IgniteCacheConfigurationPrimitiveTypesSelfTest.class);
+
return suite;
}
}
[03/18] incubator-ignite git commit: # IGNITE-983 Fixed logic to
support null value.
Posted by sb...@apache.org.
# IGNITE-983 Fixed logic to support null value.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/51d4737a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/51d4737a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/51d4737a
Branch: refs/heads/ignite-883_1
Commit: 51d4737ab70a2962ec8ef3f300317e1223d52ab3
Parents: bafad99
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 00:17:27 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 00:17:27 2015 +0700
----------------------------------------------------------------------
.../ignite/configuration/CacheConfiguration.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51d4737a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 8b1e1a5..1aa4fd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1664,14 +1664,18 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
A.ensure(indexedTypes == null || (indexedTypes.length & 1) == 0,
"Number of indexed types is expected to be even. Refer to method javadoc for details.");
- int len = indexedTypes.length;
+ if (indexedTypes != null) {
+ int len = indexedTypes.length;
- Class<?>[] newIndexedTypes = new Class<?>[len];
+ Class<?>[] newIndexedTypes = new Class<?>[len];
- for (int i = 0; i < len; i++)
- newIndexedTypes[i] = U.box(indexedTypes[i]);
+ for (int i = 0; i < len; i++)
+ newIndexedTypes[i] = U.box(indexedTypes[i]);
- this.indexedTypes = newIndexedTypes;
+ this.indexedTypes = newIndexedTypes;
+ }
+ else
+ this.indexedTypes = null;
return this;
}
[16/18] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-981' into ignite-sprint-5
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-981' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/20e56777
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/20e56777
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/20e56777
Branch: refs/heads/ignite-883_1
Commit: 20e567773b5b0674754a348b6d41d29370f3bd87
Parents: a03d111 a6ea325
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 15:06:44 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 15:06:44 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 117 +++++++------
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../igfs/IgfsClientCacheSelfTest.java | 9 +-
.../IgniteMessagingWithClientTest.java | 164 +++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
5 files changed, 238 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
[15/18] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-991' into ignite-sprint-5
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-991' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a03d111f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a03d111f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a03d111f
Branch: refs/heads/ignite-883_1
Commit: a03d111f26fd16faf0629bc11fe2ef201dee92f3
Parents: bf3203a b5ee09f
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 14:14:29 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 14:14:29 2015 +0300
----------------------------------------------------------------------
.../dht/GridClientPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopologyImpl.java | 8 +-
.../GridDhtPartitionsExchangeFuture.java | 19 +++-
...niteDynamicCacheWithConfigStartSelfTest.java | 97 ++++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 1 +
5 files changed, 118 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03d111f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03d111f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
[07/18] incubator-ignite git commit: # ignite-991 minor
Posted by sb...@apache.org.
# ignite-991 minor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b5ee09f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b5ee09f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b5ee09f0
Branch: refs/heads/ignite-883_1
Commit: b5ee09f0db550dc6a29c03e07473465d4ac767ab
Parents: 38c084a
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:34:19 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 09:34:19 2015 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/GridDhtPartitionTopologyImpl.java | 4 +++-
.../dht/preloader/GridDhtPartitionsExchangeFuture.java | 1 +
.../cache/IgniteDynamicCacheWithConfigStartSelfTest.java | 5 ++---
3 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5ee09f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index af121c3..2656990 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -668,7 +668,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
- ", allIds=" + allIds + ", node2part=" + node2part + ", cache=" + cctx.name() + ']';
+ ", allIds=" + allIds +
+ ", node2part=" + node2part +
+ ", cache=" + cctx.name() + ']';
Collection<UUID> nodeIds = part2node.get(p);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5ee09f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a03e2e8..fdaded1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -293,6 +293,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
* @param cacheId Cache ID to check.
+ * @param topVer Topology version.
* @return {@code True} if cache was added during this exchange.
*/
public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5ee09f0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
index dcd6a69..6386f8c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
@@ -48,7 +48,7 @@ public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstrac
cfg.setDiscoverySpi(discoSpi);
if (client)
- cfg.setCacheConfiguration(cacheConfiguration(gridName));
+ cfg.setCacheConfiguration(cacheConfiguration());
cfg.setClientMode(client);
@@ -56,10 +56,9 @@ public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstrac
}
/**
- * @param cacheName Cache name.
* @return Cache configuration.
*/
- protected CacheConfiguration cacheConfiguration(String cacheName) {
+ private CacheConfiguration cacheConfiguration() {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME);
ccfg.setIndexedTypes(String.class, String.class);
[05/18] incubator-ignite git commit: # IGNITE-988. Reworked scan
command.
Posted by sb...@apache.org.
# IGNITE-988. Reworked scan command.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/46eab5b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/46eab5b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/46eab5b6
Branch: refs/heads/ignite-883_1
Commit: 46eab5b67f1f10260b27a1eb0474904982aa3725
Parents: bd3abbc
Author: AKuznetsov <ak...@gridgain.com>
Authored: Thu Jun 4 10:02:02 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Thu Jun 4 10:02:02 2015 +0700
----------------------------------------------------------------------
.../java/org/apache/ignite/internal/visor/query/VisorQueryJob.java | 2 +-
.../apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46eab5b6/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index 4a9daad..8915240 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@ -65,7 +65,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
try {
UUID nid = ignite.localNode().id();
- boolean scan = arg.queryTxt().toUpperCase().startsWith("SCAN");
+ boolean scan = arg.queryTxt() == null;
String qryId = (scan ? SCAN_QRY_NAME : SQL_QRY_NAME) + "-" +
UUID.randomUUID();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46eab5b6/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
index 4b66720..3aa2a19 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
@@ -139,7 +139,7 @@ class VisorCacheScanCommand {
val firstPage =
try
executeRandom(groupForDataNode(node, cacheName),
- classOf[VisorQueryTask], new VisorQueryArg(cacheName, "SCAN", false, pageSize)) match {
+ classOf[VisorQueryTask], new VisorQueryArg(cacheName, null, false, pageSize)) match {
case x if x.get1() != null =>
error(x.get1())
[18/18] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-5' into ignite-883_1
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3417b3dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3417b3dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3417b3dc
Branch: refs/heads/ignite-883_1
Commit: 3417b3dc0904708dcbea6396f88737a2aa99fa79
Parents: 6bfc78e c9f7291
Author: sboikov <se...@inria.fr>
Authored: Thu Jun 4 21:28:32 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Thu Jun 4 21:28:32 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 13 +-
.../apache/ignite/internal/IgniteKernal.java | 3 +
.../managers/communication/GridIoManager.java | 117 ++++----
.../dht/GridClientPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopologyImpl.java | 8 +-
.../GridDhtPartitionsExchangeFuture.java | 19 +-
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../processors/hadoop/HadoopTaskContext.java | 14 +-
.../igfs/IgfsSecondaryFileSystemImpl.java | 2 +-
.../internal/visor/query/VisorQueryJob.java | 2 +-
...niteDynamicCacheWithConfigStartSelfTest.java | 97 +++++++
.../igfs/IgfsClientCacheSelfTest.java | 9 +-
.../IgniteMessagingWithClientTest.java | 164 +++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
.../testsuites/IgniteCacheTestSuite4.java | 1 +
.../fs/IgniteHadoopFileSystemCounterWriter.java | 14 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 70 ++---
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 2 +-
.../processors/hadoop/HadoopDefaultJobInfo.java | 2 +-
.../internal/processors/hadoop/HadoopUtils.java | 282 ++++++++++++++++++-
.../hadoop/SecondaryFileSystemProvider.java | 4 +-
.../hadoop/taskexecutor/HadoopRunnableTask.java | 20 +-
.../processors/hadoop/v2/HadoopV2Job.java | 31 +-
.../hadoop/v2/HadoopV2JobResourceManager.java | 26 +-
.../hadoop/v2/HadoopV2TaskContext.java | 48 +++-
.../hadoop/HadoopClientProtocolSelfTest.java | 6 +-
.../hadoop/HadoopAbstractSelfTest.java | 14 +-
.../hadoop/HadoopCommandLineTest.java | 14 +-
.../processors/hadoop/HadoopMapReduceTest.java | 176 +++++++++++-
.../hadoop/HadoopTaskExecutionSelfTest.java | 2 +-
.../hadoop/HadoopTasksAllVersionsTest.java | 15 +-
.../processors/hadoop/HadoopTasksV1Test.java | 5 +-
.../processors/hadoop/HadoopTasksV2Test.java | 5 +-
.../processors/hadoop/HadoopV2JobSelfTest.java | 6 +-
.../collections/HadoopAbstractMapTest.java | 12 +
...acheConfigurationPrimitiveTypesSelfTest.java | 104 +++++++
.../IgniteCacheWithIndexingTestSuite.java | 2 +
.../commands/cache/VisorCacheScanCommand.scala | 2 +-
38 files changed, 1122 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3417b3dc/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------