You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/11/24 13:24:19 UTC

[09/50] [abbrv] ignite git commit: GG-11655 - Restore service compatibility with releases before 1.5.30.

GG-11655 - Restore service compatibility with releases before 1.5.30.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/950bad47
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/950bad47
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/950bad47

Branch: refs/heads/ignite-4242
Commit: 950bad474ef29f9b808e74034c49a69d57eb2740
Parents: 175da6b
Author: dkarachentsev <dk...@gridgain.com>
Authored: Tue Nov 8 14:03:34 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Tue Nov 8 14:03:34 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/job/GridJobWorker.java  | 10 ++-
 .../service/GridServiceProcessor.java           | 61 ++++++++++++-
 .../internal/util/SerializableTransient.java    | 58 +++++++++++++
 .../ignite/marshaller/MarshallerUtils.java      | 22 +++++
 .../optimized/OptimizedClassDescriptor.java     | 90 +++++++++++++++++++-
 5 files changed, 234 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 8169eb1..5f38b29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -57,6 +57,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED;
@@ -421,7 +422,14 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
 
         try {
             if (job == null) {
-                job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+                MarshallerUtils.jobSenderVersion(taskNode.version());
+
+                try {
+                    job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+                }
+                finally {
+                    MarshallerUtils.jobSenderVersion(null);
+                }
 
                 // No need to hold reference any more.
                 jobBytes = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 527d360..6c26363 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.service;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
@@ -87,6 +90,7 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.internal.util.SerializableTransient;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.JobContextResource;
 import org.apache.ignite.resources.LoggerResource;
@@ -115,6 +119,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /** */
     public static final IgniteProductVersion LAZY_SERVICES_CFG_SINCE = IgniteProductVersion.fromString("1.5.22");
 
+    /** Versions that only compatible with each other, and from 1.5.33. */
+    private static final Set<IgniteProductVersion> SERVICE_TOP_CALLABLE_VER1;
+
     /** */
     private final Boolean srvcCompatibilitySysProp;
 
@@ -162,6 +169,31 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /** Topology listener. */
     private GridLocalEventListener topLsnr = new TopologyListener();
 
+    static {
+        Set<IgniteProductVersion> versions = new TreeSet<>(new Comparator<IgniteProductVersion>() {
+            @Override public int compare(final IgniteProductVersion o1, final IgniteProductVersion o2) {
+                return o1.compareToIgnoreTimestamp(o2);
+            }
+        });
+
+        versions.add(IgniteProductVersion.fromString("1.5.30"));
+        versions.add(IgniteProductVersion.fromString("1.5.31"));
+        versions.add(IgniteProductVersion.fromString("1.5.32"));
+        versions.add(IgniteProductVersion.fromString("1.6.3"));
+        versions.add(IgniteProductVersion.fromString("1.6.4"));
+        versions.add(IgniteProductVersion.fromString("1.6.5"));
+        versions.add(IgniteProductVersion.fromString("1.6.6"));
+        versions.add(IgniteProductVersion.fromString("1.6.7"));
+        versions.add(IgniteProductVersion.fromString("1.6.8"));
+        versions.add(IgniteProductVersion.fromString("1.6.9"));
+        versions.add(IgniteProductVersion.fromString("1.6.10"));
+        versions.add(IgniteProductVersion.fromString("1.7.0"));
+        versions.add(IgniteProductVersion.fromString("1.7.1"));
+        versions.add(IgniteProductVersion.fromString("1.7.2"));
+
+        SERVICE_TOP_CALLABLE_VER1 = Collections.unmodifiableSet(versions);
+    }
+
     /**
      * @param ctx Kernal context.
      */
@@ -668,9 +700,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         ClusterNode node = cache.affinity().mapKeyToNode(name);
 
         if (node.version().compareTo(ServiceTopologyCallable.SINCE_VER) >= 0) {
+            final ServiceTopologyCallable call = new ServiceTopologyCallable(name);
+
+            call.serialize = SERVICE_TOP_CALLABLE_VER1.contains(node.version());
+
             return ctx.closure().callAsyncNoFailover(
                 GridClosureCallMode.BROADCAST,
-                new ServiceTopologyCallable(name),
+                call,
                 Collections.singletonList(node),
                 false
             ).get();
@@ -1829,6 +1865,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /**
      */
     @GridInternal
+    @SerializableTransient(methodName = "serializableTransient")
     private static class ServiceTopologyCallable implements IgniteCallable<Map<UUID, Integer>> {
         /** */
         private static final long serialVersionUID = 0L;
@@ -1837,10 +1874,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.7");
 
         /** */
+        private static final String[] SER_FIELDS = {"waitedCacheInit", "jCtx", "log"};
+
+        /** */
         private final String svcName;
 
         /** */
-        private boolean waitedCacheInit;
+        private transient boolean waitedCacheInit;
 
         /** */
         @IgniteInstanceResource
@@ -1848,11 +1888,14 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
         /** */
         @JobContextResource
-        private ComputeJobContext jCtx;
+        private transient ComputeJobContext jCtx;
 
         /** */
         @LoggerResource
-        private IgniteLogger log;
+        private transient IgniteLogger log;
+
+        /** */
+        transient boolean serialize;
 
         /**
          * @param svcName Service name.
@@ -1898,6 +1941,16 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
             return serviceTopology(cache, svcName);
         }
+
+        /**
+         * @param self Instance of current class before serialization.
+         * @param ver Sender job version.
+         * @return List of serializable transient fields.
+         */
+        @SuppressWarnings("unused")
+        private static String[] serializableTransient(ServiceTopologyCallable self, IgniteProductVersion ver) {
+            return (self != null && self.serialize) || (ver != null && SERVICE_TOP_CALLABLE_VER1.contains(ver)) ? SER_FIELDS : null;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
new file mode 100644
index 0000000..14a2f27
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.apache.ignite.lang.IgniteProductVersion;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks class as it has transient fields that should be serialized.
+ * Annotated class must have method that returns list of transient
+ * fields that should be serialized.
+ * <p>
+ *     Works only for jobs. For other messages node version is not available.
+ * </p>
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface SerializableTransient {
+    /**
+     * Name of the private static method that returns list of transient fields
+     * that should be serialized (String[]), and accepts itself (before serialization)
+     * and {@link IgniteProductVersion}, e.g.
+     * <pre>
+     *     private static String[] fields(Object self, IgniteProductVersion ver){
+     *         return ver.compareTo("1.5.30") > 0 ? SERIALIZABLE_FIELDS : null;
+     *     }
+     * </pre>
+     * <p>
+     *     On serialization version argument <tt>ver</tt> is null, on deserialization - <tt>self</tt> is null.
+     * </p>
+     * <p>
+     *     If it returns empty array or null all transient fields will be normally
+     *     ignored.
+     * </p>
+     *
+     * @return Name of the method.
+     */
+    String methodName();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
index 9668baf..ad63702 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.marshaller;
 
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.jetbrains.annotations.Nullable;
 
@@ -24,6 +25,9 @@ import org.jetbrains.annotations.Nullable;
  * Utility marshaller methods.
  */
 public class MarshallerUtils {
+    /** Job sender node version. */
+    private static final ThreadLocal<IgniteProductVersion> JOB_SND_NODE_VER = new ThreadLocal<>();
+
     /**
      * Set node name to marshaller context if possible.
      *
@@ -55,4 +59,22 @@ public class MarshallerUtils {
     private MarshallerUtils() {
         // No-op.
     }
+
+    /**
+     * Sets thread local job sender node version.
+     *
+     * @param ver Thread local job sender node version.
+     */
+    public static void jobSenderVersion(IgniteProductVersion ver) {
+        JOB_SND_NODE_VER.set(ver);
+    }
+
+    /**
+     * Returns thread local job sender node version.
+     *
+     * @return Thread local job sender node version.
+     */
+    public static IgniteProductVersion jobSenderVersion() {
+        return JOB_SND_NODE_VER.get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
index 5a5b54d..160f2c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
@@ -47,8 +47,11 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.marshaller.MarshallerContext;
 import org.apache.ignite.marshaller.MarshallerExclusions;
+import org.apache.ignite.internal.util.SerializableTransient;
+import org.apache.ignite.marshaller.MarshallerUtils;
 
 import static java.lang.reflect.Modifier.isFinal;
 import static java.lang.reflect.Modifier.isPrivate;
@@ -166,6 +169,9 @@ class OptimizedClassDescriptor {
     /** Proxy interfaces. */
     private Class<?>[] proxyIntfs;
 
+    /** Method returns serializable transient fields. */
+    private Method serTransMtd;
+
     /**
      * Creates descriptor for class.
      *
@@ -441,6 +447,27 @@ class OptimizedClassDescriptor {
 
                         readObjMtds.add(mtd);
 
+                        final SerializableTransient serTransAn = c.getAnnotation(SerializableTransient.class);
+
+                        // Custom serialization policy for transient fields.
+                        if (serTransAn != null) {
+                            try {
+                                serTransMtd = c.getDeclaredMethod(serTransAn.methodName(), cls, IgniteProductVersion.class);
+
+                                int mod = serTransMtd.getModifiers();
+
+                                if (isStatic(mod) && isPrivate(mod)
+                                    && serTransMtd.getReturnType() == String[].class)
+                                    serTransMtd.setAccessible(true);
+                                else
+                                    // Set method back to null if it has incorrect signature.
+                                    serTransMtd = null;
+                            }
+                            catch (NoSuchMethodException ignored) {
+                                serTransMtd = null;
+                            }
+                        }
+
                         Field[] clsFields0 = c.getDeclaredFields();
 
                         Map<String, Field> fieldNames = new HashMap<>();
@@ -797,7 +824,7 @@ class OptimizedClassDescriptor {
                 writeTypeData(out);
 
                 out.writeShort(checksum);
-                out.writeSerializable(obj, writeObjMtds, fields);
+                out.writeSerializable(obj, writeObjMtds, serializableFields(obj.getClass(), obj, null));
 
                 break;
 
@@ -807,6 +834,60 @@ class OptimizedClassDescriptor {
     }
 
     /**
+     * Gets list of serializable fields. If {@link #serTransMtd} method
+     * returns list of transient fields, they will be added to other fields.
+     * Transient fields that are not included in that list will be normally
+     * ignored.
+     *
+     * @param cls Class.
+     * @param obj Object.
+     * @param ver Job sender version.
+     * @return Serializable fields.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private Fields serializableFields(Class<?> cls, Object obj, IgniteProductVersion ver) {
+        if (serTransMtd == null)
+            return fields;
+
+        try {
+            final String[] transFields = (String[])serTransMtd.invoke(cls, obj, ver);
+
+            if (transFields == null || transFields.length == 0)
+                return fields;
+
+            List<FieldInfo> clsFields = new ArrayList<>();
+
+            clsFields.addAll(fields.fields.get(0).fields);
+
+            for (int i = 0; i < transFields.length; i++) {
+                final String fieldName = transFields[i];
+
+                final Field f = cls.getDeclaredField(fieldName);
+
+                FieldInfo fieldInfo = new FieldInfo(f, f.getName(),
+                    GridUnsafe.objectFieldOffset(f), fieldType(f.getType()));
+
+                clsFields.add(fieldInfo);
+            }
+
+            Collections.sort(clsFields, new Comparator<FieldInfo>() {
+                @Override public int compare(FieldInfo t1, FieldInfo t2) {
+                    return t1.name().compareTo(t2.name());
+                }
+            });
+
+            List<ClassFields> fields = new ArrayList<>();
+
+            fields.add(new ClassFields(clsFields));
+
+            return new Fields(fields);
+        }
+        catch (Exception e) {
+            return fields;
+        }
+    }
+
+    /**
      * @param out Output stream.
      * @throws IOException In case of error.
      */
@@ -838,7 +919,12 @@ class OptimizedClassDescriptor {
             case SERIALIZABLE:
                 verifyChecksum(in.readShort());
 
-                return in.readSerializable(cls, readObjMtds, readResolveMtd, fields);
+                // If no serialize method, then unmarshal as usual.
+                if (serTransMtd != null)
+                    return in.readSerializable(cls, readObjMtds, readResolveMtd,
+                        serializableFields(cls, null, MarshallerUtils.jobSenderVersion()));
+                else
+                    return in.readSerializable(cls, readObjMtds, readResolveMtd, fields);
 
             default:
                 assert false : "Unexpected type: " + type;