You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:15:51 UTC

[11/50] [abbrv] ignite git commit: IGNITE-4231 - Hangs on compute result serialization error. Fix

IGNITE-4231 - Hangs on compute result serialization error. Fix


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6d744db2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6d744db2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6d744db2

Branch: refs/heads/master
Commit: 6d744db27111c68e13b06ec99428a4c4148d97b6
Parents: ceb60d2
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Dec 12 11:44:57 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Mon Dec 12 11:44:57 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/job/GridJobWorker.java  |  69 +++++++-
 .../closure/GridClosureSerializationTest.java   | 177 +++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   2 +
 3 files changed, 241 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6d744db2/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 5f38b29..f5c6a27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.job;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -790,6 +791,64 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                     }
                     else {
                         try {
+                            byte[] resBytes = null;
+                            byte[] exBytes = null;
+                            byte[] attrBytes = null;
+
+                            boolean loc = ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs();
+
+                            Map<Object, Object> attrs = jobCtx.getAttributes();
+
+                            // Try serialize response, and if exception - return to client.
+                            if (!loc) {
+                                try {
+                                    resBytes = U.marshal(marsh, res);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    resBytes = U.marshal(marsh, null);
+
+                                    if (ex != null)
+                                        ex.addSuppressed(e);
+                                    else
+                                        ex = U.convertException(e);
+
+                                    U.error(log, "Failed to serialize job response [nodeId=" + taskNode.id() +
+                                        ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
+                                        ", resCls=" + (res == null ? null : res.getClass()) + ']', e);
+                                }
+
+                                try {
+                                    attrBytes = U.marshal(marsh, attrs);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    attrBytes = U.marshal(marsh, Collections.emptyMap());
+
+                                    if (ex != null)
+                                        ex.addSuppressed(e);
+                                    else
+                                        ex = U.convertException(e);
+
+                                    U.error(log, "Failed to serialize job attributes [nodeId=" + taskNode.id() +
+                                        ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
+                                        ", attrs=" + attrs + ']', e);
+                                }
+
+                                try {
+                                    exBytes = U.marshal(marsh, ex);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    String msg = "Failed to serialize job exception [nodeId=" + taskNode.id() +
+                                        ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
+                                        ", msg=\"" + e.getMessage() + "\"]";
+
+                                    ex = new IgniteException(msg);
+
+                                    U.error(log, msg, e);
+
+                                    exBytes = U.marshal(marsh, ex);
+                                }
+                            }
+
                             if (ex != null) {
                                 if (isStarted) {
                                     // Job failed.
@@ -804,19 +863,15 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                             else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
                                 evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
 
-                            boolean loc = ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs();
-
-                            Map<Object, Object> attrs = jobCtx.getAttributes();
-
                             GridJobExecuteResponse jobRes = new GridJobExecuteResponse(
                                 ctx.localNodeId(),
                                 ses.getId(),
                                 ses.getJobId(),
-                                loc ? null : U.marshal(marsh, ex),
+                                exBytes,
                                 loc ? ex : null,
-                                loc ? null: U.marshal(marsh, res),
+                                resBytes,
                                 loc ? res : null,
-                                loc ? null : U.marshal(marsh, attrs),
+                                attrBytes,
                                 loc ? attrs : null,
                                 isCancelled(),
                                 retry ? ctx.cache().context().exchange().readyAffinityVersion() : null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6d744db2/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java
new file mode 100644
index 0000000..2426dd7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.closure;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.compute.ComputeJobContext;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.JobContextResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests handling of job result serialization error.
+ */
+public class GridClosureSerializationTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(null);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(0);
+        startGrid(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "Convert2Lambda"})
+    public void testSerializationFailure() throws Exception {
+        final IgniteEx ignite0 = grid(0);
+        final IgniteEx ignite1 = grid(1);
+
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                ignite1.compute(ignite1.cluster().forNode(ignite0.localNode())).call(new IgniteCallable<Object>() {
+                    @Override public Object call() throws Exception {
+                        return new CaseClass.CaseClass2();
+                    }
+                });
+
+                return null;
+            }
+        }, BinaryObjectException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "Convert2Lambda"})
+    public void testExceptionSerializationFailure() throws Exception {
+        final IgniteEx ignite0 = grid(0);
+        final IgniteEx ignite1 = grid(1);
+
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                ignite1.compute(ignite1.cluster().forNode(ignite0.localNode())).call(new IgniteCallable<Object>() {
+                    @Override public Object call() throws Exception {
+                        throw new BrokenException();
+                    }
+                });
+
+                return null;
+            }
+        }, IgniteException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "Convert2Lambda"})
+    public void testAttributesSerializationFailure() throws Exception {
+        final IgniteEx ignite0 = grid(0);
+        final IgniteEx ignite1 = grid(1);
+
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @JobContextResource
+            private ComputeJobContext jobCtx;
+
+            @Override public Object call() throws Exception {
+                ignite1.compute(ignite1.cluster().forNode(ignite0.localNode())).call(new IgniteCallable<Object>() {
+                    @Override public Object call() throws Exception {
+                        jobCtx.setAttribute("test-attr", new BrokenAttribute());
+
+                        return null;
+                    }
+                });
+
+                return null;
+            }
+        }, IgniteException.class, null);
+    }
+
+    /**
+     * Binary marshaller will fail because subclass defines other field with different case.
+     */
+    @SuppressWarnings("unused")
+    private static class CaseClass {
+        /** */
+        private String val;
+
+        /**
+         *
+         */
+        private static class CaseClass2 extends CaseClass {
+            /** */
+            private String vAl;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class BrokenAttribute implements Externalizable {
+        /** {@inheritDoc} */
+        @Override public void writeExternal(final ObjectOutput out) throws IOException {
+            throw new IOException("Test exception");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            throw new IOException("Test exception");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class BrokenException extends Exception implements Externalizable {
+        /** {@inheritDoc} */
+        @Override public void writeExternal(final ObjectOutput out) throws IOException {
+            throw new IOException("Test exception");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            throw new IOException("Test exception");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6d744db2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 6ab0885..1c1fcf7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCac
 import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheConcurrentReadWriteTest;
 import org.apache.ignite.internal.processors.cache.OffHeapTieredTransactionSelfTest;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTest;
+import org.apache.ignite.internal.processors.closure.GridClosureSerializationTest;
 import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest;
 import org.apache.ignite.internal.processors.continuous.GridMessageListenSelfTest;
 import org.apache.ignite.internal.processors.odbc.OdbcProcessorValidationSelfTest;
@@ -109,6 +110,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTestSuite(GridProductVersionSelfTest.class);
         suite.addTestSuite(GridAffinityProcessorRendezvousSelfTest.class);
         suite.addTestSuite(GridClosureProcessorSelfTest.class);
+        suite.addTestSuite(GridClosureSerializationTest.class);
         suite.addTestSuite(ClosureServiceClientsNodesTest.class);
         suite.addTestSuite(GridStartStopSelfTest.class);
         suite.addTestSuite(GridProjectionForCachesSelfTest.class);