You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/16 09:07:38 UTC
[3/8] incubator-ignite git commit: Revert "ignite-471: fixed NPE in
PortableMarshaller"
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index 1e85ecd..8410e71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -21,7 +21,6 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
@@ -39,8 +38,20 @@ class GridResourceIoc {
private final ConcurrentMap<ClassLoader, Set<Class<?>>> taskMap =
new ConcurrentHashMap8<>();
- /** Class descriptors cache. */
- private final ConcurrentMap<Class<?>, ClassDescriptor> clsDescs = new ConcurrentHashMap8<>();
+ /** Field cache. */
+ private final ConcurrentMap<Class<?>, ConcurrentMap<Class<? extends Annotation>, GridResourceField[]>> fieldCache =
+ new ConcurrentHashMap8<>();
+
+ /** Method cache. */
+ private final ConcurrentMap<Class<?>, ConcurrentMap<Class<? extends Annotation>, GridResourceMethod[]>> mtdCache =
+ new ConcurrentHashMap8<>();
+
+ /**
+ * Cache for classes that do not require injection with some annotation.
+ * Maps annotation classes to set a set of target classes to skip.
+ */
+ private final ConcurrentMap<Class<? extends Annotation>, Set<Class<?>>> skipCache =
+ new ConcurrentHashMap8<>();
/** */
private final ConcurrentMap<Class<?>, Class<? extends Annotation>[]> annCache =
@@ -53,8 +64,18 @@ class GridResourceIoc {
Set<Class<?>> clss = taskMap.remove(ldr);
if (clss != null) {
- clsDescs.keySet().removeAll(clss);
- annCache.keySet().removeAll(clss);
+ fieldCache.keySet().removeAll(clss);
+ mtdCache.keySet().removeAll(clss);
+
+ for (Map.Entry<Class<? extends Annotation>, Set<Class<?>>> e : skipCache.entrySet()) {
+ Set<Class<?>> skipClss = e.getValue();
+
+ if (skipClss != null)
+ e.getValue().removeAll(clss);
+ }
+
+ for (Class<?> cls : clss)
+ annCache.remove(cls);
}
}
@@ -63,8 +84,8 @@ class GridResourceIoc {
*/
void undeployAll() {
taskMap.clear();
- clsDescs.clear();
- annCache.clear();
+ mtdCache.clear();
+ fieldCache.clear();
}
/**
@@ -86,26 +107,15 @@ class GridResourceIoc {
@Nullable Class<?> depCls)
throws IgniteCheckedException
{
- return injectInternal(target, annCls, injector, dep, depCls, null);
- }
-
- /**
- * @param cls Class.
- */
- private ClassDescriptor descriptor(@Nullable GridDeployment dep, Class<?> cls) {
- ClassDescriptor res = clsDescs.get(cls);
-
- if (res == null) {
- if (dep != null) {
- Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
-
- classes.add(cls);
- }
+ assert target != null;
+ assert annCls != null;
+ assert injector != null;
- res = F.addIfAbsent(clsDescs, cls, new ClassDescriptor(cls));
- }
+ if (isAnnotationPresent(target, annCls, dep))
+ // Use identity hash set to compare via referential equality.
+ return injectInternal(target, annCls, injector, dep, depCls, new GridLeanIdentitySet<>());
- return res;
+ return false;
}
/**
@@ -123,54 +133,73 @@ class GridResourceIoc {
GridResourceInjector injector,
@Nullable GridDeployment dep,
@Nullable Class<?> depCls,
- @Nullable Set<Object> checkedObjs)
+ Set<Object> checkedObjs)
throws IgniteCheckedException
{
- Class<?> targetCls = target.getClass();
+ assert target != null;
+ assert annCls != null;
+ assert injector != null;
+ assert checkedObjs != null;
- ClassDescriptor descr = descriptor(dep, targetCls);
+ Class<?> targetCls = target.getClass();
- T2<GridResourceField[], GridResourceMethod[]> annotatedMembers = descr.annotatedMembers(annCls);
+ Set<Class<?>> skipClss = skipCache.get(annCls);
- if (descr.recursiveFields().length == 0 && annotatedMembers == null)
+ // Skip this class if it does not need to be injected.
+ if (skipClss != null && skipClss.contains(targetCls))
return false;
- if (checkedObjs == null && descr.recursiveFields().length > 0)
- checkedObjs = new GridLeanIdentitySet<>();
-
- if (checkedObjs != null && !checkedObjs.add(target))
+ // Check if already inspected to avoid indefinite recursion.
+ if (!checkedObjs.add(target))
return false;
+ int annCnt = 0;
+
boolean injected = false;
- for (Field field : descr.recursiveFields()) {
- try {
- Object obj = field.get(target);
+ for (GridResourceField field : getFieldsWithAnnotation(dep, targetCls, annCls)) {
+ if (field.processFieldValue()) {
+ Field f = field.getField();
- if (obj != null) {
- assert checkedObjs != null;
+ try {
+ Object obj = f.get(target);
- injected |= injectInternal(obj, annCls, injector, dep, depCls, checkedObjs);
+ if (obj != null) {
+ // Recursion.
+ boolean injected0 = injectInternal(obj, annCls, injector, dep, depCls, checkedObjs);
+
+ injected |= injected0;
+ }
+ }
+ catch (IllegalAccessException e) {
+ throw new IgniteCheckedException("Failed to inject resource [field=" + f.getName() +
+ ", target=" + target + ']', e);
}
}
- catch (IllegalAccessException e) {
- throw new IgniteCheckedException("Failed to inject resource [field=" + field.getName() +
- ", target=" + target + ']', e);
- }
- }
-
- if (annotatedMembers != null) {
- for (GridResourceField field : annotatedMembers.get1()) {
+ else {
injector.inject(field, target, depCls, dep);
injected = true;
}
- for (GridResourceMethod mtd : annotatedMembers.get2()) {
- injector.inject(mtd, target, depCls, dep);
+ annCnt++;
+ }
- injected = true;
- }
+ for (GridResourceMethod mtd : getMethodsWithAnnotation(dep, targetCls, annCls)) {
+ injector.inject(mtd, target, depCls, dep);
+
+ injected = true;
+
+ annCnt++;
+ }
+
+ if (annCnt == 0) {
+ if (skipClss == null)
+ skipClss = F.addIfAbsent(skipCache, annCls, F.<Class<?>>newCSet());
+
+ assert skipClss != null;
+
+ skipClss.add(targetCls);
}
return injected;
@@ -188,9 +217,29 @@ class GridResourceIoc {
assert target != null;
assert annCls != null;
- ClassDescriptor desc = descriptor(dep, target.getClass());
+ Class<?> targetCls = target.getClass();
+
+ Set<Class<?>> skipClss = skipCache.get(annCls);
+
+ if (skipClss != null && skipClss.contains(targetCls))
+ return false;
+
+ GridResourceField[] fields = getFieldsWithAnnotation(dep, targetCls, annCls);
+
+ if (fields.length > 0)
+ return true;
- return desc.recursiveFields().length > 0 || desc.annotatedMembers(annCls) != null;
+ GridResourceMethod[] mtds = getMethodsWithAnnotation(dep, targetCls, annCls);
+
+ if (mtds.length > 0)
+ return true;
+
+ if (skipClss == null)
+ skipClss = F.addIfAbsent(skipCache, annCls, F.<Class<?>>newCSet());
+
+ skipClss.add(targetCls);
+
+ return false;
}
/**
@@ -211,14 +260,17 @@ class GridResourceIoc {
Class<? extends Annotation>[] res = annCache.get(cls);
if (res == null) {
- Collection<Class<? extends Annotation>> res0 = new ArrayList<>();
+ Collection<Class<? extends Annotation>> res0 =
+ new HashSet<>(annClss.size(), 1.0f);
for (Class<? extends Annotation> annCls : annClss) {
if (isAnnotationPresent(target, annCls, dep))
res0.add(annCls);
}
- res = res0.toArray(new Class[res0.size()]);
+ res = new Class[res0.size()];
+
+ res0.toArray(res);
annCache.putIfAbsent(cls, res);
}
@@ -227,6 +279,36 @@ class GridResourceIoc {
}
/**
+ * For tests only.
+ *
+ * @param cls Class for test.
+ * @return {@code true} if cached, {@code false} otherwise.
+ */
+ boolean isCached(Class<?> cls) {
+ return isCached(cls.getName());
+ }
+
+ /**
+ * For tests only.
+ *
+ * @param clsName Class for test.
+ * @return {@code true} if cached, {@code false} otherwise.
+ */
+ boolean isCached(String clsName) {
+ for (Class<?> aClass : fieldCache.keySet()) {
+ if (aClass.getName().equals(clsName))
+ return true;
+ }
+
+ for (Class<?> aClass : mtdCache.keySet()) {
+ if (aClass.getName().equals(clsName))
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* Gets set of methods with given annotation.
*
* @param dep Deployment.
@@ -236,111 +318,156 @@ class GridResourceIoc {
*/
GridResourceMethod[] getMethodsWithAnnotation(@Nullable GridDeployment dep, Class<?> cls,
Class<? extends Annotation> annCls) {
- ClassDescriptor desc = descriptor(dep, cls);
+ GridResourceMethod[] mtds = getMethodsFromCache(cls, annCls);
- T2<GridResourceField[], GridResourceMethod[]> t2 = desc.annotatedMembers(annCls);
+ if (mtds == null) {
+ List<GridResourceMethod> mtdsList = new ArrayList<>();
- return t2 == null ? GridResourceMethod.EMPTY_ARRAY : t2.get2();
- }
+ for (Class cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
+ for (Method mtd : cls0.getDeclaredMethods()) {
+ Annotation ann = mtd.getAnnotation(annCls);
- /** {@inheritDoc} */
- public void printMemoryStats() {
- X.println(">>> taskMapSize: " + taskMap.size());
- X.println(">>> classDescriptorsCacheSize: " + clsDescs.size());
+ if (ann != null)
+ mtdsList.add(new GridResourceMethod(mtd, ann));
+ }
+ }
+
+ if (mtdsList.isEmpty())
+ mtds = GridResourceMethod.EMPTY_ARRAY;
+ else
+ mtds = mtdsList.toArray(new GridResourceMethod[mtdsList.size()]);
+
+ cacheMethods(dep, cls, annCls, mtds);
+ }
+
+ return mtds;
}
/**
+ * Gets all entries from the specified class or its super-classes that have
+ * been annotated with annotation provided.
*
+ * @param cls Class in which search for methods.
+ * @param dep Deployment.
+ * @param annCls Annotation.
+ * @return Set of entries with given annotations.
*/
- private static class ClassDescriptor {
- /** */
- private final Field[] recursiveFields;
-
- /** */
- private final Map<Class<? extends Annotation>, T2<GridResourceField[], GridResourceMethod[]>> annMap;
-
- /**
- * @param cls Class.
- */
- ClassDescriptor(Class<?> cls) {
- Map<Class<? extends Annotation>, T2<List<GridResourceField>, List<GridResourceMethod>>> annMap
- = new HashMap<>();
+ private GridResourceField[] getFieldsWithAnnotation(@Nullable GridDeployment dep, Class<?> cls,
+ Class<? extends Annotation> annCls) {
+ GridResourceField[] fields = getFieldsFromCache(cls, annCls);
- List<Field> recursiveFieldsList = new ArrayList<>();
+ if (fields == null) {
+ List<GridResourceField> fieldsList = new ArrayList<>();
boolean allowImplicitInjection = !GridNoImplicitInjection.class.isAssignableFrom(cls);
for (Class cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
for (Field field : cls0.getDeclaredFields()) {
- Annotation[] fieldAnns = field.getAnnotations();
+ Annotation ann = field.getAnnotation(annCls);
- for (Annotation ann : fieldAnns) {
- T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
+ if (ann != null)
+ fieldsList.add(new GridResourceField(field, ann));
+ else if (allowImplicitInjection && GridResourceUtils.mayRequireResources(field)) {
+ // Account for anonymous inner classes.
+ fieldsList.add(new GridResourceField(field, null));
+ }
+ }
+ }
- if (t2 == null) {
- t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
- new ArrayList<GridResourceField>(),
- new ArrayList<GridResourceMethod>());
+ if (fieldsList.isEmpty())
+ fields = GridResourceField.EMPTY_ARRAY;
+ else
+ fields = fieldsList.toArray(new GridResourceField[fieldsList.size()]);
- annMap.put(ann.annotationType(), t2);
- }
+ cacheFields(dep, cls, annCls, fields);
+ }
- t2.get1().add(new GridResourceField(field, ann));
- }
+ return fields;
+ }
- if (allowImplicitInjection
- && fieldAnns.length == 0
- && GridResourceUtils.mayRequireResources(field)) {
- field.setAccessible(true);
+ /**
+ * Gets all fields for a given class with given annotation from cache.
+ *
+ * @param cls Class to get fields from.
+ * @param annCls Annotation class for fields.
+ * @return List of fields with given annotation, possibly {@code null}.
+ */
+ @Nullable private GridResourceField[] getFieldsFromCache(Class<?> cls, Class<? extends Annotation> annCls) {
+ Map<Class<? extends Annotation>, GridResourceField[]> annCache = fieldCache.get(cls);
- // Account for anonymous inner classes.
- recursiveFieldsList.add(field);
- }
- }
+ return annCache != null ? annCache.get(annCls) : null;
+ }
- for (Method mtd : cls0.getDeclaredMethods()) {
- for (Annotation ann : mtd.getAnnotations()) {
- T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
+ /**
+ * Caches list of fields with given annotation from given class.
+ *
+ * @param cls Class the fields belong to.
+ * @param dep Deployment.
+ * @param annCls Annotation class for the fields.
+ * @param fields Fields to cache.
+ */
+ private void cacheFields(@Nullable GridDeployment dep, Class<?> cls, Class<? extends Annotation> annCls,
+ GridResourceField[] fields) {
+ if (dep != null) {
+ Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
- if (t2 == null) {
- t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
- new ArrayList<GridResourceField>(),
- new ArrayList<GridResourceMethod>());
+ assert classes != null;
- annMap.put(ann.annotationType(), t2);
- }
+ classes.add(cls);
+ }
- t2.get2().add(new GridResourceMethod(mtd, ann));
- }
- }
- }
+ Map<Class<? extends Annotation>, GridResourceField[]> rsrcFields =
+ F.addIfAbsent(fieldCache, cls, F.<Class<? extends Annotation>, GridResourceField[]>newCMap());
- recursiveFields = recursiveFieldsList.isEmpty() ? U.EMPTY_FIELDS
- : recursiveFieldsList.toArray(new Field[recursiveFieldsList.size()]);
+ assert rsrcFields != null;
- this.annMap = IgniteUtils.limitedMap(annMap.size());
+ rsrcFields.put(annCls, fields);
+ }
- for (Map.Entry<Class<? extends Annotation>, T2<List<GridResourceField>, List<GridResourceMethod>>> entry
- : annMap.entrySet()) {
- GridResourceField[] fields = GridResourceField.toArray(entry.getValue().get1());
- GridResourceMethod[] mtds = GridResourceMethod.toArray(entry.getValue().get2());
+ /**
+ * Gets all methods for a given class with given annotation from cache.
+ *
+ * @param cls Class to get methods from.
+ * @param annCls Annotation class for fields.
+ * @return List of methods with given annotation, possibly {@code null}.
+ */
+ @Nullable private GridResourceMethod[] getMethodsFromCache(Class<?> cls, Class<? extends Annotation> annCls) {
+ Map<Class<? extends Annotation>, GridResourceMethod[]> annCache = mtdCache.get(cls);
- this.annMap.put(entry.getKey(), new T2<>(fields, mtds));
- }
- }
+ return annCache != null ? annCache.get(annCls) : null;
+ }
- /**
- * @return Recursive fields.
- */
- public Field[] recursiveFields() {
- return recursiveFields;
- }
+ /**
+ * Caches list of methods with given annotation from given class.
+ *
+ * @param rsrcCls Class the fields belong to.
+ * @param dep Deployment.
+ * @param annCls Annotation class for the fields.
+ * @param mtds Methods to cache.
+ */
+ private void cacheMethods(@Nullable GridDeployment dep, Class<?> rsrcCls, Class<? extends Annotation> annCls,
+ GridResourceMethod[] mtds) {
+ if (dep != null) {
+ Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
+
+ assert classes != null;
- /**
- * @return Fields.
- */
- @Nullable public T2<GridResourceField[], GridResourceMethod[]> annotatedMembers(Class<? extends Annotation> annCls) {
- return annMap.get(annCls);
+ classes.add(rsrcCls);
}
+
+ Map<Class<? extends Annotation>, GridResourceMethod[]> rsrcMtds = F.addIfAbsent(mtdCache,
+ rsrcCls, F.<Class<? extends Annotation>, GridResourceMethod[]>newCMap());
+
+ assert rsrcMtds != null;
+
+ rsrcMtds.put(annCls, mtds);
+ }
+
+ /** {@inheritDoc} */
+ public void printMemoryStats() {
+ X.println(">>> taskMapSize: " + taskMap.size());
+ X.println(">>> fieldCacheSize: " + fieldCache.size());
+ X.println(">>> mtdCacheSize: " + mtdCache.size());
+ X.println(">>> skipCacheSize: " + skipCache.size());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceMethod.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceMethod.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceMethod.java
index ad08a40..aba9405 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceMethod.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceMethod.java
@@ -21,7 +21,6 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import java.lang.annotation.*;
import java.lang.reflect.*;
-import java.util.*;
/**
* Wrapper for data where resource should be injected.
@@ -49,8 +48,6 @@ class GridResourceMethod {
this.mtd = mtd;
this.ann = ann;
-
- mtd.setAccessible(true);
}
/**
@@ -71,16 +68,6 @@ class GridResourceMethod {
return ann;
}
- /**
- * @param c Closure.
- */
- public static GridResourceMethod[] toArray(Collection<GridResourceMethod> c) {
- if (c.isEmpty())
- return EMPTY_ARRAY;
-
- return c.toArray(new GridResourceMethod[c.size()]);
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridResourceMethod.class, this);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index f5ba492..f08a287 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -23,6 +23,7 @@ import org.apache.ignite.compute.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.lifecycle.*;
import org.apache.ignite.resources.*;
@@ -146,8 +147,8 @@ public class GridResourceProcessor extends GridProcessorAdapter {
Method mtd = rsrcMtd.getMethod();
try {
- // No need to call mtd.setAccessible(true);
- // It has been called in GridResourceMethod constructor.
+ mtd.setAccessible(true);
+
mtd.invoke(target);
}
catch (IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
@@ -277,7 +278,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
log.debug("Injecting resources: " + job);
// Unwrap Proxy object.
- Object obj = unwrapTarget(job);
+ Object obj = unwrapTarget(unwrapJob(job));
injectToJob(dep, taskCls, obj, ses, jobCtx);
@@ -328,6 +329,19 @@ public class GridResourceProcessor extends GridProcessorAdapter {
}
/**
+ * Gets rid of job wrapper, if any.
+ *
+ * @param job Job to unwrap.
+ * @return Unwrapped job.
+ */
+ private ComputeJob unwrapJob(ComputeJob job) {
+ if (job instanceof GridComputeJobWrapper)
+ return ((GridComputeJobWrapper)job).wrappedJob();
+
+ return job;
+ }
+
+ /**
* Injects held resources into given grid task.
*
* @param dep Deployed class.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 80fcbb0..2ab8e0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -109,13 +109,10 @@ public abstract class IgniteUtils {
private static final int[] GRID_EVTS;
/** Empty integers array. */
- public static final int[] EMPTY_INTS = new int[0];
+ private static final int[] EMPTY_INTS = new int[0];
/** Empty longs. */
- public static final long[] EMPTY_LONGS = new long[0];
-
- /** Empty longs. */
- public static final Field[] EMPTY_FIELDS = new Field[0];
+ private static final long[] EMPTY_LONGS = new long[0];
/** System line separator. */
private static final String NL = System.getProperty("line.separator");
@@ -8859,21 +8856,6 @@ public abstract class IgniteUtils {
}
/**
- * Creates new map that limited by size.
- *
- * @param limit Limit for size.
- */
- public static <K, V> Map<K, V> limitedMap(int limit) {
- if (limit == 0)
- return Collections.emptyMap();
-
- if (limit < 5)
- return new GridLeanMap<>(limit);
-
- return new HashMap<>(capacity(limit), 0.75f);
- }
-
- /**
* Returns comparator that sorts remote node addresses. If remote node resides on the same host, then put
* loopback addresses first, last otherwise.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridComputeJobWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridComputeJobWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridComputeJobWrapper.java
new file mode 100644
index 0000000..82c0078
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridComputeJobWrapper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.lang;
+
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+
+/**
+ * Convenient wrapper for grid job. It allows to create a job clone in cases when the same
+ * job needs to be cloned to multiple grid nodes during mapping phase of task execution.
+ */
+public class GridComputeJobWrapper implements ComputeJob, Callable<Object>,
+ GridPeerDeployAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final ComputeJob job;
+
+ /** Peer deploy aware class. */
+ private transient volatile GridPeerDeployAware p;
+
+ /**
+ * Creates a wrapper with given grid {@code job}.
+ *
+ * @param job Job to wrap.
+ */
+ public GridComputeJobWrapper(ComputeJob job) {
+ A.notNull(job, "job");
+
+ this.job = job;
+ }
+
+ /**
+ * Gets wrapped job.
+ *
+ * @return Wrapped job.
+ */
+ public ComputeJob wrappedJob() {
+ return job;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public final Object call() throws Exception {
+ return execute();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Class<?> deployClass() {
+ if (p == null)
+ p = U.detectPeerDeployAware(this);
+
+ return p.deployClass();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClassLoader classLoader() {
+ if (p == null)
+ p = U.detectPeerDeployAware(this);
+
+ return p.classLoader();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ job.cancel();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() {
+ return job.execute();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridComputeJobWrapper.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
index a03d2c8..3f81dc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.util.nio;
import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import java.io.*;
import java.nio.*;
@@ -32,6 +33,9 @@ import java.nio.*;
* | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE |
* +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
* </pre>
+ * <p>
+ * It expects that first 4 bytes in stream are {@link U#IGNITE_HEADER}. If beginning of a stream,
+ * isn't equal to these bytes than exception will be thrown.
*/
public class GridBufferedParser implements GridNioParser {
/** Buffer metadata key. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
deleted file mode 100644
index 256597c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.nio;
-
-import org.apache.ignite.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * This class implements stream parser based on {@link GridNioDelimitedBuffer}.
- * <p>
- * The rule for this parser is that every message sent over the stream is appended with
- * delimiter (bytes array). So, the stream structure is as follows:
- * <pre>
- * +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
- * | MESSAGE | DELIMITER | MESSAGE | DELIMITER |
- * +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
- * </pre>
- */
-public class GridDelimitedParser implements GridNioParser {
- /** Buffer metadata key. */
- private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
-
- /** Delimiter. */
- private final byte[] delim;
-
- /** Direct buffer. */
- private final boolean directBuf;
-
- /**
- * @param delim Delimiter.
- * @param directBuf Direct buffer.
- */
- public GridDelimitedParser(byte[] delim, boolean directBuf) {
- this.delim = delim;
- this.directBuf = directBuf;
- }
-
- /** {@inheritDoc} */
- @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
- GridNioDelimitedBuffer nioBuf = ses.meta(BUF_META_KEY);
-
- // Decode for a given session is called per one thread, so there should not be any concurrency issues.
- // However, we make some additional checks.
- if (nioBuf == null) {
- nioBuf = new GridNioDelimitedBuffer(delim);
-
- GridNioDelimitedBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
-
- assert old == null;
- }
-
- return nioBuf.read(buf);
- }
-
- /** {@inheritDoc} */
- @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
- byte[] msg0 = (byte[])msg;
-
- int cap = msg0.length + delim.length;
- ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(cap) : ByteBuffer.allocate(cap);
-
- res.put(msg0);
- res.put(delim);
-
- res.flip();
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return this.getClass().getSimpleName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
deleted file mode 100644
index 2b764ec..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.nio;
-
-import org.jetbrains.annotations.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- * Buffer with message delimiter support.
- */
-public class GridNioDelimitedBuffer {
- /** Delimiter. */
- private final byte[] delim;
-
- /** Data. */
- private byte[] data = new byte[16384];
-
- /** Count. */
- private int cnt;
-
- /** Index. */
- private int idx;
-
- /**
- * @param delim Delimiter.
- */
- public GridNioDelimitedBuffer(byte[] delim) {
- assert delim != null;
- assert delim.length > 0;
-
- this.delim = delim;
-
- reset();
- }
-
- /**
- * Resets buffer state.
- */
- private void reset() {
- cnt = 0;
- idx = 0;
- }
-
- /**
- * @param buf Buffer.
- * @return Message bytes or {@code null} if message is not fully read yet.
- */
- @Nullable public byte[] read(ByteBuffer buf) {
- while(buf.hasRemaining()) {
- if (cnt == data.length)
- data = Arrays.copyOf(data, data.length * 2);
-
- byte b = buf.get();
-
- data[cnt++] = b;
-
- if (b == delim[idx])
- idx++;
- else if (idx > 0) {
- int pos = cnt - idx;
-
- idx = 0;
-
- for (int i = pos; i < cnt; i++) {
- if (data[pos] == delim[idx]) {
- pos++;
-
- idx++;
- }
- else {
- pos = cnt - idx;
-
- idx = 0;
- }
- }
- }
-
- if (idx == delim.length) {
- byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length);
-
- reset();
-
- return bytes;
- }
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index fd17791..2d5c541 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -203,7 +203,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
public static final int DFLT_ACK_SND_THRESHOLD = 16;
/** Default socket write timeout. */
- public static final long DFLT_SOCK_WRITE_TIMEOUT = 200;
+ public static final long DFLT_SOCK_WRITE_TIMEOUT = GridNioServer.DFLT_SES_WRITE_TIMEOUT;
/** No-op runnable. */
private static final IgniteRunnable NOOP = new IgniteRunnable() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 4cff45b..27fa473 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2689,26 +2689,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
msgLsnr.apply(msg);
if (redirectToClients(msg)) {
- ByteBuffer marshalledMsg = null;
-
- for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
- // Send a clone to client to avoid ConcurrentModificationException
- TcpDiscoveryAbstractMessage msgClone;
-
- try {
- if (marshalledMsg == null)
- marshalledMsg = marsh.marshal(msg);
-
- msgClone = marsh.unmarshal(marshalledMsg, null);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal message: " + msg, e);
-
- msgClone = msg;
- }
-
- clientMsgWorker.addMessage(msgClone);
- }
+ for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values())
+ clientMsgWorker.addMessage(msg);
}
Collection<TcpDiscoveryNode> failedNodes;
@@ -3951,6 +3933,18 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
long topVer;
if (locNodeCoord) {
+ if (!msg.client() && ipFinder.isShared()) {
+ try {
+ ipFinder.unregisterAddresses(leftNode.socketAddresses());
+ }
+ catch (IgniteSpiException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to unregister left node address: " + leftNode);
+
+ onException("Failed to unregister left node address: " + leftNode, e);
+ }
+ }
+
topVer = ring.incrementTopologyVersion();
msg.topologyVersion(topVer);
@@ -4118,6 +4112,20 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
long topVer;
if (locNodeCoord) {
+ if (!node.isClient() && ipFinder.isShared()) {
+ try {
+ ipFinder.unregisterAddresses(node.socketAddresses());
+ }
+ catch (IgniteSpiException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to unregister failed node address [node=" + node +
+ ", err=" + e.getMessage() + ']');
+
+ onException("Failed to unregister failed node address [node=" + node +
+ ", err=" + e.getMessage() + ']', e);
+ }
+ }
+
topVer = ring.incrementTopologyVersion();
msg.topologyVersion(topVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index c52cbc1..6b3f068 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -55,11 +55,11 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
/** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
public static final long DFLT_SOCK_TIMEOUT = 200;
- /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>200ms</tt>). */
- public static final long DFLT_ACK_TIMEOUT = 200;
+ /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */
+ public static final long DFLT_ACK_TIMEOUT = 5000;
- /** Default network timeout in milliseconds (value is <tt>200ms</tt>). */
- public static final long DFLT_NETWORK_TIMEOUT = 200;
+ /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */
+ public static final long DFLT_NETWORK_TIMEOUT = 5000;
/** Default value for thread priority (value is <tt>10</tt>). */
public static final int DFLT_THREAD_PRI = 10;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
deleted file mode 100644
index 0c4e2d1..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream;
-
-import org.apache.ignite.*;
-
-import java.util.*;
-
-/**
- * Convenience adapter for streamers. Adapters are optional components for
- * streaming from different data sources. The purpose of adapters is to
- * convert different message formats into Ignite stream key-value tuples
- * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
- */
-public abstract class StreamAdapter<T, K, V> {
- /** Tuple extractor. */
- private StreamTupleExtractor<T, K, V> extractor;
-
- /** Streamer. */
- private IgniteDataStreamer<K, V> stmr;
-
- /** Ignite. */
- private Ignite ignite;
-
- /**
- * Empty constructor.
- */
- protected StreamAdapter() {
- // No-op.
- }
-
- /**
- * Stream adapter.
- *
- * @param stmr Streamer.
- * @param extractor Tuple extractor.
- */
- protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
- this.stmr = stmr;
- this.extractor = extractor;
- }
-
- /**
- * @return Provided data streamer.
- */
- public IgniteDataStreamer<K, V> getStreamer() {
- return stmr;
- }
-
- /**
- * @param stmr Ignite data streamer.
- */
- public void setStreamer(IgniteDataStreamer<K, V> stmr) {
- this.stmr = stmr;
- }
-
- /**
- * @return Provided tuple extractor.
- */
- public StreamTupleExtractor<T, K, V> getTupleExtractor() {
- return extractor;
- }
-
- /**
- * @param extractor Extractor for key-value tuples from messages.
- */
- public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
- this.extractor = extractor;
- }
-
- /**
- * @return Provided {@link Ignite} instance.
- */
- public Ignite getIgnite() {
- return ignite;
- }
-
- /**
- * @param ignite {@link Ignite} instance.
- */
- public void setIgnite(Ignite ignite) {
- this.ignite = ignite;
- }
-
- /**
- * Converts given message to a tuple and adds it to the underlying streamer.
- *
- * @param msg Message to convert.
- */
- protected void addMessage(T msg) {
- Map.Entry<K, V> e = extractor.extract(msg);
-
- if (e != null)
- stmr.addData(e);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
deleted file mode 100644
index d2a4ede..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream;
-
-import java.util.*;
-
-/**
- * Stream tuple extractor to convert messages to Ignite key-value tuples.
- */
-public interface StreamTupleExtractor<T, K, V> {
- /**
- * Extracts a key-value tuple from a message.
- *
- * @param msg Message.
- * @return Key-value tuple.
- */
- public Map.Entry<K, V> extract(T msg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
deleted file mode 100644
index 8161d86..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.socket;
-
-/**
- * Socket message converter.
- */
-public interface SocketMessageConverter<T> {
- /**
- * Converter message represented by array of bytes to object.
- *
- * @param msg Message.
- * @return Converted object.
- */
- public T convert(byte[] msg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
deleted file mode 100644
index d308897..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.nio.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.jdk.*;
-import org.apache.ignite.stream.*;
-
-import org.jetbrains.annotations.*;
-
-import java.net.*;
-import java.nio.*;
-
-/**
- * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
- * streams into {@link IgniteDataStreamer} instance.
- * <p>
- * By default server uses size-based message processing. That is every message sent over the socket is prepended with
- * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
- * delimiter-based message processing will be used. That is every message sent over the socket is appended with
- * provided delimiter.
- * <p>
- * Received messages through socket converts to Java object using standard serialization. Conversion functionality
- * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
- * non Java clients).
- */
-public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
- /** Default threads. */
- private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Address. */
- private InetAddress addr;
-
- /** Server port. */
- private int port;
-
- /** Threads number. */
- private int threads = DFLT_THREADS;
-
- /** Direct mode. */
- private boolean directMode;
-
- /** Delimiter. */
- private byte[] delim;
-
- /** Converter. */
- private SocketMessageConverter<T> converter;
-
- /** Server. */
- private GridNioServer<byte[]> srv;
-
- /**
- * Sets server address.
- *
- * @param addr Address.
- */
- public void setAddr(InetAddress addr) {
- this.addr = addr;
- }
-
- /**
- * Sets port number.
- *
- * @param port Port.
- */
- public void setPort(int port) {
- this.port = port;
- }
-
- /**
- * Sets threadds amount.
- *
- * @param threads Threads.
- */
- public void setThreads(int threads) {
- this.threads = threads;
- }
-
- /**
- * Sets direct mode flag.
- *
- * @param directMode Direct mode.
- */
- public void setDirectMode(boolean directMode) {
- this.directMode = directMode;
- }
-
- /**
- * Sets message delimiter.
- *
- * @param delim Delimiter.
- */
- public void setDelimiter(byte[] delim) {
- this.delim = delim;
- }
-
- /**
- * Sets message converter.
- *
- * @param converter Converter.
- */
- public void setConverter(SocketMessageConverter<T> converter) {
- this.converter = converter;
- }
-
- /**
- * Starts streamer.
- *
- * @throws IgniteException If failed.
- */
- public void start() {
- A.notNull(getTupleExtractor(), "tupleExtractor");
- A.notNull(getStreamer(), "streamer");
- A.notNull(getIgnite(), "ignite");
- A.ensure(threads > 0, "threads > 0");
-
- log = getIgnite().log();
-
- GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
- @Override public void onConnected(GridNioSession ses) {
- assert ses.accepted();
-
- if (log.isDebugEnabled())
- log.debug("Accepted connection: " + ses.remoteAddress());
- }
-
- @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
- if (e != null)
- log.error("Connection failed with exception", e);
- }
-
- @Override public void onMessage(GridNioSession ses, byte[] msg) {
- addMessage(converter.convert(msg));
- }
- };
-
- ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
-
- GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
- new GridDelimitedParser(delim, directMode);
-
- if (converter == null)
- converter = new DefaultConverter<>();
-
- GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
-
- GridNioFilter[] filters = new GridNioFilter[] {codec};
-
- try {
- srv = new GridNioServer.Builder<byte[]>()
- .address(addr == null ? InetAddress.getLocalHost() : addr)
- .port(port)
- .listener(lsnr)
- .logger(log)
- .selectorCount(threads)
- .byteOrder(byteOrder)
- .filters(filters)
- .build();
- }
- catch (IgniteCheckedException | UnknownHostException e) {
- throw new IgniteException(e);
- }
-
- srv.start();
-
- if (log.isDebugEnabled())
- log.debug("Socket streaming server started on " + addr + ':' + port);
- }
-
- /**
- * Stops streamer.
- */
- public void stop() {
- srv.stop();
-
- if (log.isDebugEnabled())
- log.debug("Socket streaming server stopped");
- }
-
- /**
- * Converts message to Java object using Jdk marshaller.
- */
- private static class DefaultConverter<T> implements SocketMessageConverter<T> {
- /** Marshaller. */
- private static final JdkMarshaller MARSH = new JdkMarshaller();
-
- /** {@inheritDoc} */
- @Override public T convert(byte[] msg) {
- try {
- return MARSH.unmarshal(ByteBuffer.wrap(msg), null);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
deleted file mode 100644
index e1cef65..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains socket streamer implementation.
- */
-package org.apache.ignite.stream.socket;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 4f4c1ae..1d4a652 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1140,6 +1140,7 @@ org.apache.ignite.internal.util.lang.GridAbsClosure
org.apache.ignite.internal.util.lang.GridAbsClosureX
org.apache.ignite.internal.util.lang.GridCloseableIterator
org.apache.ignite.internal.util.lang.GridClosureException
+org.apache.ignite.internal.util.lang.GridComputeJobWrapper
org.apache.ignite.internal.util.lang.GridFunc$1
org.apache.ignite.internal.util.lang.GridFunc$10
org.apache.ignite.internal.util.lang.GridFunc$100
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
deleted file mode 100644
index 8c7d33d..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.local.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMemoryMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- * Cache map entry self test.
- */
-public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest {
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- startGrids(1);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- // No-op.
- }
-
- /**
- * @param gridName Grid name.
- * @param memoryMode Memory mode.
- * @param atomicityMode Atomicity mode.
- * @param cacheMode Cache mode.
- * @param cacheName Cache name.
- * @return Cache configuration.
- * @throws Exception If failed.
- */
- private CacheConfiguration cacheConfiguration(String gridName,
- CacheMemoryMode memoryMode,
- CacheAtomicityMode atomicityMode,
- CacheMode cacheMode,
- String cacheName)
- throws Exception
- {
- CacheConfiguration cfg = super.cacheConfiguration(gridName);
-
- cfg.setCacheMode(cacheMode);
- cfg.setAtomicityMode(atomicityMode);
- cfg.setMemoryMode(memoryMode);
- cfg.setName(cacheName);
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCacheMapEntry() throws Exception {
- checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, LOCAL, GridLocalCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, LOCAL, GridLocalCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, LOCAL, GridLocalCacheEntry.class);
-
- checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
-
- checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, PARTITIONED, GridNearCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, PARTITIONED, GridNearOffHeapCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, PARTITIONED, GridNearOffHeapCacheEntry.class);
-
- checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, PARTITIONED, GridNearCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, PARTITIONED, GridNearOffHeapCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, PARTITIONED, GridNearOffHeapCacheEntry.class);
-
- checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, REPLICATED, GridDhtAtomicCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, REPLICATED, GridDhtAtomicOffHeapCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, REPLICATED, GridDhtAtomicOffHeapCacheEntry.class);
-
- checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, REPLICATED, GridDhtColocatedCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, REPLICATED, GridDhtColocatedOffHeapCacheEntry.class);
-
- checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, REPLICATED, GridDhtColocatedOffHeapCacheEntry.class);
- }
-
- /**
- * @param memoryMode Cache memory mode.
- * @param atomicityMode Cache atomicity mode.
- * @param cacheMode Cache mode.
- * @param entryCls Class of cache map entry.
- * @throws Exception If failed.
- */
- private void checkCacheMapEntry(CacheMemoryMode memoryMode,
- CacheAtomicityMode atomicityMode,
- CacheMode cacheMode,
- Class<?> entryCls)
- throws Exception
- {
- log.info("Test cache [memMode=" + memoryMode +
- ", atomicityMode=" + atomicityMode +
- ", cacheMode=" + cacheMode + ']');
-
- CacheConfiguration cfg = cacheConfiguration(grid(0).name(),
- memoryMode,
- atomicityMode,
- cacheMode,
- "Cache");
-
- try (IgniteCache jcache = grid(0).getOrCreateCache(cfg)) {
- GridCacheAdapter<Integer, String> cache = ((IgniteKernal)grid(0)).internalCache(jcache.getName());
-
- Integer key = primaryKey(grid(0).cache(null));
-
- cache.put(key, "val");
-
- GridCacheEntryEx entry = cache.entryEx(key);
-
- entry.unswap(true);
-
- assertNotNull(entry);
-
- assertEquals(entry.getClass(), entryCls);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
deleted file mode 100644
index f5de96f..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.testframework.*;
-
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Test remove all method.
- */
-public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest {
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 60000;
- }
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 4;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRemoveAll() throws Exception {
- IgniteCache<Integer, String> cache = grid(0).cache(null);
-
- for (int i = 0; i < 10_000; ++i)
- cache.put(i, "val");
-
- final AtomicInteger igniteId = new AtomicInteger(gridCount());
-
- IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- for (int i = 0; i < 2; ++i)
- startGrid(igniteId.getAndIncrement());
-
- return true;
- }
- }, 3, "start-node-thread");
-
- cache.removeAll();
-
- fut.get();
-
- U.sleep(5000);
-
- for (int i = 0; i < igniteId.get(); ++i) {
- IgniteCache locCache = grid(i).cache(null);
-
- assertEquals("Local size: " + locCache.localSize() + "\n" +
- "On heap: " + locCache.localSize(CachePeekMode.ONHEAP) + "\n" +
- "Off heap: " + locCache.localSize(CachePeekMode.OFFHEAP) + "\n" +
- "Swap: " + locCache.localSize(CachePeekMode.SWAP) + "\n" +
- "Primary: " + locCache.localSize(CachePeekMode.PRIMARY) + "\n" +
- "Backup: " + locCache.localSize(CachePeekMode.BACKUP),
- 0, locCache.localSize());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index 5d9ad35..5389ef9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -25,12 +25,12 @@ import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import javax.cache.*;
+import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.cache.CacheRebalanceMode.*;
@@ -70,12 +70,6 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
- TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
-
- discoSpi.setSocketTimeout(10_000);
- discoSpi.setAckTimeout(10_000);
- discoSpi.setNetworkTimeout(10_000);
-
return cfg;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
index 7e65f23..85e2c7c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
@@ -105,10 +106,6 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
disco.setIpFinder(ipFinder);
- disco.setSocketTimeout(30_000);
- disco.setAckTimeout(30_000);
- disco.setNetworkTimeout(30_000);
-
c.setDiscoverySpi(disco);
return c;
@@ -515,7 +512,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
try {
cache.put(key, Integer.toString(key));
}
- catch (IgniteException | CacheException ignored) {
+ catch (TransactionRollbackException | ClusterTopologyException | CacheException ignored) {
// It is ok if primary node leaves grid.
}
@@ -671,7 +668,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
tx.commit();
}
}
- catch (IgniteException | CacheException ignored) {
+ catch (ClusterTopologyException | CacheException ignored) {
// It is ok if primary node leaves grid.
}
@@ -817,7 +814,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
tx.commit();
}
- catch (IgniteException | CacheException ignored) {
+ catch (ClusterTopologyException | CacheException ignored) {
// It is ok if primary node leaves grid.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
index 2fe76e7..ab0f7d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
@@ -501,6 +501,8 @@ public abstract class GridCacheLockAbstractTest extends GridCommonAbstractTest {
* @throws Throwable If failed.
*/
public void testLockReentrancy() throws Throwable {
+ fail("https://issues.apache.org/jira/browse/IGNITE-835");
+
Affinity<Integer> aff = ignite1.affinity(null);
for (int i = 10; i < 100; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java
deleted file mode 100644
index 469f513..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.transactions.*;
-import org.jetbrains.annotations.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- *
- */
-public class IgniteTxGetAfterStopTest extends IgniteCacheAbstractTest {
- /** */
- private CacheMode cacheMode;
-
- /** */
- private NearCacheConfiguration nearCfg;
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 4;
- }
-
- /** {@inheritDoc} */
- @Override protected CacheMode cacheMode() {
- return cacheMode;
- }
-
- /** {@inheritDoc} */
- @Override protected CacheAtomicityMode atomicityMode() {
- return TRANSACTIONAL;
- }
-
- /** {@inheritDoc} */
- @Override protected NearCacheConfiguration nearConfiguration() {
- return nearCfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- stopAllGrids();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReplicated() throws Exception {
- getAfterStop(REPLICATED, null);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitioned() throws Exception {
- getAfterStop(PARTITIONED, new NearCacheConfiguration());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionedNearDisabled() throws Exception {
- getAfterStop(PARTITIONED, null);
- }
-
- /**
- * @param cacheMode Cache mode.
- * @param nearCfg Near cache configuration.
- * @throws Exception If failed.
- */
- private void getAfterStop(CacheMode cacheMode, @Nullable NearCacheConfiguration nearCfg) throws Exception {
- this.cacheMode = cacheMode;
- this.nearCfg = nearCfg;
-
- startGrids();
-
- IgniteCache<Integer, Integer> cache0 = jcache(0);
- IgniteCache<Integer, Integer> cache1 = jcache(1);
-
- Integer key0 = primaryKey(cache0);
- Integer key1 = primaryKey(cache1);
-
- try (Transaction tx = ignite(0).transactions().txStart()) {
- log.info("Put: " + key0);
-
- cache0.put(key0, key0);
-
- log.info("Stop node.");
-
- stopGrid(3);
-
- log.info("Get: " + key1);
-
- cache0.get(key1);
-
- log.info("Commit.");
-
- tx.commit();
- }
-
- assertEquals(key0, cache0.get(key0));
- assertNull(cache1.get(key1));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java
deleted file mode 100644
index 69c7909..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-
-/**
- *
- */
-public class GridCachePartitionedNearDisabledLockSelfTest extends GridCachePartitionedLockSelfTest {
- /** {@inheritDoc} */
- @Override protected CacheConfiguration cacheConfiguration() {
- CacheConfiguration ccfg = super.cacheConfiguration();
-
- assertNotNull(ccfg.getNearConfiguration());
-
- ccfg.setNearConfiguration(null);
-
- return ccfg;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean isPartitioned() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void testLockReentrancy() throws Throwable {
- fail("https://issues.apache.org/jira/browse/IGNITE-835");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index f996877..ee2f16b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -199,7 +199,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
- IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
+ IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
waitPrepared(ignite(1));
@@ -360,7 +360,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
- IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
+ IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
waitPrepared(ignite(1));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
index 1e57c09..0a2781b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
import static org.apache.ignite.cache.CacheMode.*;
@@ -30,9 +29,4 @@ public class GridCacheAtomicReplicatedFailoverSelfTest extends GridCacheAtomicFa
@Override protected CacheMode cacheMode() {
return REPLICATED;
}
-
- /** {@inheritDoc} */
- @Override protected NearCacheConfiguration nearConfiguration() {
- return null;
- }
}