You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:15:51 UTC
[11/50] [abbrv] ignite git commit: IGNITE-4231 - Hangs on compute
result serialization error. Fix
IGNITE-4231 - Hangs on compute result serialization error. Fix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6d744db2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6d744db2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6d744db2
Branch: refs/heads/master
Commit: 6d744db27111c68e13b06ec99428a4c4148d97b6
Parents: ceb60d2
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Dec 12 11:44:57 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Mon Dec 12 11:44:57 2016 +0300
----------------------------------------------------------------------
.../internal/processors/job/GridJobWorker.java | 69 +++++++-
.../closure/GridClosureSerializationTest.java | 177 +++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +
3 files changed, 241 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6d744db2/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 5f38b29..f5c6a27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.job;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -790,6 +791,64 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
}
else {
try {
+ byte[] resBytes = null;
+ byte[] exBytes = null;
+ byte[] attrBytes = null;
+
+ boolean loc = ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs();
+
+ Map<Object, Object> attrs = jobCtx.getAttributes();
+
+ // Try serialize response, and if exception - return to client.
+ if (!loc) {
+ try {
+ resBytes = U.marshal(marsh, res);
+ }
+ catch (IgniteCheckedException e) {
+ resBytes = U.marshal(marsh, null);
+
+ if (ex != null)
+ ex.addSuppressed(e);
+ else
+ ex = U.convertException(e);
+
+ U.error(log, "Failed to serialize job response [nodeId=" + taskNode.id() +
+ ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
+ ", resCls=" + (res == null ? null : res.getClass()) + ']', e);
+ }
+
+ try {
+ attrBytes = U.marshal(marsh, attrs);
+ }
+ catch (IgniteCheckedException e) {
+ attrBytes = U.marshal(marsh, Collections.emptyMap());
+
+ if (ex != null)
+ ex.addSuppressed(e);
+ else
+ ex = U.convertException(e);
+
+ U.error(log, "Failed to serialize job attributes [nodeId=" + taskNode.id() +
+ ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
+ ", attrs=" + attrs + ']', e);
+ }
+
+ try {
+ exBytes = U.marshal(marsh, ex);
+ }
+ catch (IgniteCheckedException e) {
+ String msg = "Failed to serialize job exception [nodeId=" + taskNode.id() +
+ ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job +
+ ", msg=\"" + e.getMessage() + "\"]";
+
+ ex = new IgniteException(msg);
+
+ U.error(log, msg, e);
+
+ exBytes = U.marshal(marsh, ex);
+ }
+ }
+
if (ex != null) {
if (isStarted) {
// Job failed.
@@ -804,19 +863,15 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
- boolean loc = ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs();
-
- Map<Object, Object> attrs = jobCtx.getAttributes();
-
GridJobExecuteResponse jobRes = new GridJobExecuteResponse(
ctx.localNodeId(),
ses.getId(),
ses.getJobId(),
- loc ? null : U.marshal(marsh, ex),
+ exBytes,
loc ? ex : null,
- loc ? null: U.marshal(marsh, res),
+ resBytes,
loc ? res : null,
- loc ? null : U.marshal(marsh, attrs),
+ attrBytes,
loc ? attrs : null,
isCancelled(),
retry ? ctx.cache().context().exchange().readyAffinityVersion() : null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6d744db2/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java
new file mode 100644
index 0000000..2426dd7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.closure;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.compute.ComputeJobContext;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.JobContextResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests handling of job result serialization error.
+ */
+public class GridClosureSerializationTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(null);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid(0);
+ startGrid(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "Convert2Lambda"})
+ public void testSerializationFailure() throws Exception {
+ final IgniteEx ignite0 = grid(0);
+ final IgniteEx ignite1 = grid(1);
+
+ GridTestUtils.assertThrows(null, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ ignite1.compute(ignite1.cluster().forNode(ignite0.localNode())).call(new IgniteCallable<Object>() {
+ @Override public Object call() throws Exception {
+ return new CaseClass.CaseClass2();
+ }
+ });
+
+ return null;
+ }
+ }, BinaryObjectException.class, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "Convert2Lambda"})
+ public void testExceptionSerializationFailure() throws Exception {
+ final IgniteEx ignite0 = grid(0);
+ final IgniteEx ignite1 = grid(1);
+
+ GridTestUtils.assertThrows(null, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ ignite1.compute(ignite1.cluster().forNode(ignite0.localNode())).call(new IgniteCallable<Object>() {
+ @Override public Object call() throws Exception {
+ throw new BrokenException();
+ }
+ });
+
+ return null;
+ }
+ }, IgniteException.class, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "Convert2Lambda"})
+ public void testAttributesSerializationFailure() throws Exception {
+ final IgniteEx ignite0 = grid(0);
+ final IgniteEx ignite1 = grid(1);
+
+ GridTestUtils.assertThrows(null, new Callable<Object>() {
+ @JobContextResource
+ private ComputeJobContext jobCtx;
+
+ @Override public Object call() throws Exception {
+ ignite1.compute(ignite1.cluster().forNode(ignite0.localNode())).call(new IgniteCallable<Object>() {
+ @Override public Object call() throws Exception {
+ jobCtx.setAttribute("test-attr", new BrokenAttribute());
+
+ return null;
+ }
+ });
+
+ return null;
+ }
+ }, IgniteException.class, null);
+ }
+
+ /**
+ * Binary marshaller will fail because subclass defines other field with different case.
+ */
+ @SuppressWarnings("unused")
+ private static class CaseClass {
+ /** */
+ private String val;
+
+ /**
+ *
+ */
+ private static class CaseClass2 extends CaseClass {
+ /** */
+ private String vAl;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class BrokenAttribute implements Externalizable {
+ /** {@inheritDoc} */
+ @Override public void writeExternal(final ObjectOutput out) throws IOException {
+ throw new IOException("Test exception");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ throw new IOException("Test exception");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class BrokenException extends Exception implements Externalizable {
+ /** {@inheritDoc} */
+ @Override public void writeExternal(final ObjectOutput out) throws IOException {
+ throw new IOException("Test exception");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ throw new IOException("Test exception");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6d744db2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 6ab0885..1c1fcf7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCac
import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheConcurrentReadWriteTest;
import org.apache.ignite.internal.processors.cache.OffHeapTieredTransactionSelfTest;
import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTest;
+import org.apache.ignite.internal.processors.closure.GridClosureSerializationTest;
import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest;
import org.apache.ignite.internal.processors.continuous.GridMessageListenSelfTest;
import org.apache.ignite.internal.processors.odbc.OdbcProcessorValidationSelfTest;
@@ -109,6 +110,7 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTestSuite(GridProductVersionSelfTest.class);
suite.addTestSuite(GridAffinityProcessorRendezvousSelfTest.class);
suite.addTestSuite(GridClosureProcessorSelfTest.class);
+ suite.addTestSuite(GridClosureSerializationTest.class);
suite.addTestSuite(ClosureServiceClientsNodesTest.class);
suite.addTestSuite(GridStartStopSelfTest.class);
suite.addTestSuite(GridProjectionForCachesSelfTest.class);