You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/11/24 13:24:19 UTC
[09/50] [abbrv] ignite git commit: GG-11655 - Restore service
compatibility with releases before 1.5.30.
GG-11655 - Restore service compatibility with releases before 1.5.30.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/950bad47
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/950bad47
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/950bad47
Branch: refs/heads/ignite-4242
Commit: 950bad474ef29f9b808e74034c49a69d57eb2740
Parents: 175da6b
Author: dkarachentsev <dk...@gridgain.com>
Authored: Tue Nov 8 14:03:34 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Tue Nov 8 14:03:34 2016 +0300
----------------------------------------------------------------------
.../internal/processors/job/GridJobWorker.java | 10 ++-
.../service/GridServiceProcessor.java | 61 ++++++++++++-
.../internal/util/SerializableTransient.java | 58 +++++++++++++
.../ignite/marshaller/MarshallerUtils.java | 22 +++++
.../optimized/OptimizedClassDescriptor.java | 90 +++++++++++++++++++-
5 files changed, 234 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 8169eb1..5f38b29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -57,6 +57,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED;
@@ -421,7 +422,14 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
try {
if (job == null) {
- job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ MarshallerUtils.jobSenderVersion(taskNode.version());
+
+ try {
+ job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ }
+ finally {
+ MarshallerUtils.jobSenderVersion(null);
+ }
// No need to hold reference any more.
jobBytes = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 527d360..6c26363 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
@@ -87,6 +90,7 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.internal.util.SerializableTransient;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
@@ -115,6 +119,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
/** */
public static final IgniteProductVersion LAZY_SERVICES_CFG_SINCE = IgniteProductVersion.fromString("1.5.22");
+ /** Versions that only compatible with each other, and from 1.5.33. */
+ private static final Set<IgniteProductVersion> SERVICE_TOP_CALLABLE_VER1;
+
/** */
private final Boolean srvcCompatibilitySysProp;
@@ -162,6 +169,31 @@ public class GridServiceProcessor extends GridProcessorAdapter {
/** Topology listener. */
private GridLocalEventListener topLsnr = new TopologyListener();
+ static {
+ Set<IgniteProductVersion> versions = new TreeSet<>(new Comparator<IgniteProductVersion>() {
+ @Override public int compare(final IgniteProductVersion o1, final IgniteProductVersion o2) {
+ return o1.compareToIgnoreTimestamp(o2);
+ }
+ });
+
+ versions.add(IgniteProductVersion.fromString("1.5.30"));
+ versions.add(IgniteProductVersion.fromString("1.5.31"));
+ versions.add(IgniteProductVersion.fromString("1.5.32"));
+ versions.add(IgniteProductVersion.fromString("1.6.3"));
+ versions.add(IgniteProductVersion.fromString("1.6.4"));
+ versions.add(IgniteProductVersion.fromString("1.6.5"));
+ versions.add(IgniteProductVersion.fromString("1.6.6"));
+ versions.add(IgniteProductVersion.fromString("1.6.7"));
+ versions.add(IgniteProductVersion.fromString("1.6.8"));
+ versions.add(IgniteProductVersion.fromString("1.6.9"));
+ versions.add(IgniteProductVersion.fromString("1.6.10"));
+ versions.add(IgniteProductVersion.fromString("1.7.0"));
+ versions.add(IgniteProductVersion.fromString("1.7.1"));
+ versions.add(IgniteProductVersion.fromString("1.7.2"));
+
+ SERVICE_TOP_CALLABLE_VER1 = Collections.unmodifiableSet(versions);
+ }
+
/**
* @param ctx Kernal context.
*/
@@ -668,9 +700,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
ClusterNode node = cache.affinity().mapKeyToNode(name);
if (node.version().compareTo(ServiceTopologyCallable.SINCE_VER) >= 0) {
+ final ServiceTopologyCallable call = new ServiceTopologyCallable(name);
+
+ call.serialize = SERVICE_TOP_CALLABLE_VER1.contains(node.version());
+
return ctx.closure().callAsyncNoFailover(
GridClosureCallMode.BROADCAST,
- new ServiceTopologyCallable(name),
+ call,
Collections.singletonList(node),
false
).get();
@@ -1829,6 +1865,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
/**
*/
@GridInternal
+ @SerializableTransient(methodName = "serializableTransient")
private static class ServiceTopologyCallable implements IgniteCallable<Map<UUID, Integer>> {
/** */
private static final long serialVersionUID = 0L;
@@ -1837,10 +1874,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.7");
/** */
+ private static final String[] SER_FIELDS = {"waitedCacheInit", "jCtx", "log"};
+
+ /** */
private final String svcName;
/** */
- private boolean waitedCacheInit;
+ private transient boolean waitedCacheInit;
/** */
@IgniteInstanceResource
@@ -1848,11 +1888,14 @@ public class GridServiceProcessor extends GridProcessorAdapter {
/** */
@JobContextResource
- private ComputeJobContext jCtx;
+ private transient ComputeJobContext jCtx;
/** */
@LoggerResource
- private IgniteLogger log;
+ private transient IgniteLogger log;
+
+ /** */
+ transient boolean serialize;
/**
* @param svcName Service name.
@@ -1898,6 +1941,16 @@ public class GridServiceProcessor extends GridProcessorAdapter {
return serviceTopology(cache, svcName);
}
+
+ /**
+ * @param self Instance of current class before serialization.
+ * @param ver Sender job version.
+ * @return List of serializable transient fields.
+ */
+ @SuppressWarnings("unused")
+ private static String[] serializableTransient(ServiceTopologyCallable self, IgniteProductVersion ver) {
+ return (self != null && self.serialize) || (ver != null && SERVICE_TOP_CALLABLE_VER1.contains(ver)) ? SER_FIELDS : null;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
new file mode 100644
index 0000000..14a2f27
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.apache.ignite.lang.IgniteProductVersion;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks class as it has transient fields that should be serialized.
+ * Annotated class must have method that returns list of transient
+ * fields that should be serialized.
+ * <p>
+ * Works only for jobs. For other messages node version is not available.
+ * </p>
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface SerializableTransient {
+ /**
+ * Name of the private static method that returns list of transient fields
+ * that should be serialized (String[]), and accepts itself (before serialization)
+ * and {@link IgniteProductVersion}, e.g.
+ * <pre>
+ * private static String[] fields(Object self, IgniteProductVersion ver){
+ * return ver.compareTo("1.5.30") > 0 ? SERIALIZABLE_FIELDS : null;
+ * }
+ * </pre>
+ * <p>
+ * On serialization version argument <tt>ver</tt> is null, on deserialization - <tt>self</tt> is null.
+ * </p>
+ * <p>
+ * If it returns empty array or null all transient fields will be normally
+ * ignored.
+ * </p>
+ *
+ * @return Name of the method.
+ */
+ String methodName();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
index 9668baf..ad63702 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
@@ -17,6 +17,7 @@
package org.apache.ignite.marshaller;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.Nullable;
@@ -24,6 +25,9 @@ import org.jetbrains.annotations.Nullable;
* Utility marshaller methods.
*/
public class MarshallerUtils {
+ /** Job sender node version. */
+ private static final ThreadLocal<IgniteProductVersion> JOB_SND_NODE_VER = new ThreadLocal<>();
+
/**
* Set node name to marshaller context if possible.
*
@@ -55,4 +59,22 @@ public class MarshallerUtils {
private MarshallerUtils() {
// No-op.
}
+
+ /**
+ * Sets thread local job sender node version.
+ *
+ * @param ver Thread local job sender node version.
+ */
+ public static void jobSenderVersion(IgniteProductVersion ver) {
+ JOB_SND_NODE_VER.set(ver);
+ }
+
+ /**
+ * Returns thread local job sender node version.
+ *
+ * @return Thread local job sender node version.
+ */
+ public static IgniteProductVersion jobSenderVersion() {
+ return JOB_SND_NODE_VER.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
index 5a5b54d..160f2c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
@@ -47,8 +47,11 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.marshaller.MarshallerContext;
import org.apache.ignite.marshaller.MarshallerExclusions;
+import org.apache.ignite.internal.util.SerializableTransient;
+import org.apache.ignite.marshaller.MarshallerUtils;
import static java.lang.reflect.Modifier.isFinal;
import static java.lang.reflect.Modifier.isPrivate;
@@ -166,6 +169,9 @@ class OptimizedClassDescriptor {
/** Proxy interfaces. */
private Class<?>[] proxyIntfs;
+ /** Method returns serializable transient fields. */
+ private Method serTransMtd;
+
/**
* Creates descriptor for class.
*
@@ -441,6 +447,27 @@ class OptimizedClassDescriptor {
readObjMtds.add(mtd);
+ final SerializableTransient serTransAn = c.getAnnotation(SerializableTransient.class);
+
+ // Custom serialization policy for transient fields.
+ if (serTransAn != null) {
+ try {
+ serTransMtd = c.getDeclaredMethod(serTransAn.methodName(), cls, IgniteProductVersion.class);
+
+ int mod = serTransMtd.getModifiers();
+
+ if (isStatic(mod) && isPrivate(mod)
+ && serTransMtd.getReturnType() == String[].class)
+ serTransMtd.setAccessible(true);
+ else
+ // Set method back to null if it has incorrect signature.
+ serTransMtd = null;
+ }
+ catch (NoSuchMethodException ignored) {
+ serTransMtd = null;
+ }
+ }
+
Field[] clsFields0 = c.getDeclaredFields();
Map<String, Field> fieldNames = new HashMap<>();
@@ -797,7 +824,7 @@ class OptimizedClassDescriptor {
writeTypeData(out);
out.writeShort(checksum);
- out.writeSerializable(obj, writeObjMtds, fields);
+ out.writeSerializable(obj, writeObjMtds, serializableFields(obj.getClass(), obj, null));
break;
@@ -807,6 +834,60 @@ class OptimizedClassDescriptor {
}
/**
+ * Gets list of serializable fields. If {@link #serTransMtd} method
+ * returns list of transient fields, they will be added to other fields.
+ * Transient fields that are not included in that list will be normally
+ * ignored.
+ *
+ * @param cls Class.
+ * @param obj Object.
+ * @param ver Job sender version.
+ * @return Serializable fields.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private Fields serializableFields(Class<?> cls, Object obj, IgniteProductVersion ver) {
+ if (serTransMtd == null)
+ return fields;
+
+ try {
+ final String[] transFields = (String[])serTransMtd.invoke(cls, obj, ver);
+
+ if (transFields == null || transFields.length == 0)
+ return fields;
+
+ List<FieldInfo> clsFields = new ArrayList<>();
+
+ clsFields.addAll(fields.fields.get(0).fields);
+
+ for (int i = 0; i < transFields.length; i++) {
+ final String fieldName = transFields[i];
+
+ final Field f = cls.getDeclaredField(fieldName);
+
+ FieldInfo fieldInfo = new FieldInfo(f, f.getName(),
+ GridUnsafe.objectFieldOffset(f), fieldType(f.getType()));
+
+ clsFields.add(fieldInfo);
+ }
+
+ Collections.sort(clsFields, new Comparator<FieldInfo>() {
+ @Override public int compare(FieldInfo t1, FieldInfo t2) {
+ return t1.name().compareTo(t2.name());
+ }
+ });
+
+ List<ClassFields> fields = new ArrayList<>();
+
+ fields.add(new ClassFields(clsFields));
+
+ return new Fields(fields);
+ }
+ catch (Exception e) {
+ return fields;
+ }
+ }
+
+ /**
* @param out Output stream.
* @throws IOException In case of error.
*/
@@ -838,7 +919,12 @@ class OptimizedClassDescriptor {
case SERIALIZABLE:
verifyChecksum(in.readShort());
- return in.readSerializable(cls, readObjMtds, readResolveMtd, fields);
+ // If no serialize method, then unmarshal as usual.
+ if (serTransMtd != null)
+ return in.readSerializable(cls, readObjMtds, readResolveMtd,
+ serializableFields(cls, null, MarshallerUtils.jobSenderVersion()));
+ else
+ return in.readSerializable(cls, readObjMtds, readResolveMtd, fields);
default:
assert false : "Unexpected type: " + type;