You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/03 18:42:40 UTC
incubator-ignite git commit: #ignite-964: all tests works.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-964-1 1f9fd7736 -> 2105d7222
#ignite-964: all tests works.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2105d722
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2105d722
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2105d722
Branch: refs/heads/ignite-964-1
Commit: 2105d72229098f7cf576b0b68a4dd3250fb0a540
Parents: 1f9fd77
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Jul 3 19:42:20 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Jul 3 19:42:20 2015 +0300
----------------------------------------------------------------------
.../processors/rest/GridRestProcessor.java | 32 +-
.../IgniteScriptingCommandHandler.java | 330 ------------------
.../rest/handlers/scripting/NodeJSIgnite.java | 2 +-
.../ignite/internal/NodeJsComputeSelfTest.java | 13 +-
.../jetty/IgniteScriptingCommandHandler.java | 336 +++++++++++++++++++
5 files changed, 376 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2105d722/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index c8cb25a..a27327c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.processors.rest.client.message.*;
import org.apache.ignite.internal.processors.rest.handlers.*;
import org.apache.ignite.internal.processors.rest.handlers.cache.*;
import org.apache.ignite.internal.processors.rest.handlers.query.*;
-import org.apache.ignite.internal.processors.rest.handlers.scripting.*;
import org.apache.ignite.internal.processors.rest.handlers.datastructures.*;
import org.apache.ignite.internal.processors.rest.handlers.task.*;
import org.apache.ignite.internal.processors.rest.handlers.top.*;
@@ -59,6 +58,10 @@ public class GridRestProcessor extends GridProcessorAdapter {
private static final String HTTP_PROTO_CLS =
"org.apache.ignite.internal.processors.rest.protocols.http.jetty.GridJettyRestProtocol";
+ /** HTTP protocol class name. */
+ private static final String HTTP_SCRIPTING_CLS =
+ "org.apache.ignite.internal.processors.rest.protocols.http.jetty.IgniteScriptingCommandHandler";
+
/** */
public static final byte[] ZERO_BYTES = new byte[0];
@@ -292,8 +295,8 @@ public class GridRestProcessor extends GridProcessorAdapter {
addHandler(new GridTopologyCommandHandler(ctx));
addHandler(new GridVersionNameCommandHandler(ctx));
addHandler(new DataStructuresCommandHandler(ctx));
- addHandler(new IgniteScriptingCommandHandler(ctx));
addHandler(new QueryCommandHandler(ctx));
+ addScriptingHandler();
}
}
@@ -652,6 +655,31 @@ public class GridRestProcessor extends GridProcessorAdapter {
}
/**
+ * Add scripting handler if exist in classpath.
+ *
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void addScriptingHandler() throws IgniteCheckedException {
+ try {
+ Class<?> cls1 = Class.forName(HTTP_SCRIPTING_CLS);
+
+ Constructor<?> ctor1 = cls1.getConstructor(GridKernalContext.class);
+
+ GridRestCommandHandlerAdapter handl1 = (GridRestCommandHandlerAdapter)ctor1.newInstance(ctx);
+
+ addHandler(handl1);
+ }
+ catch (ClassNotFoundException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to add scripting handler (consider adding ignite-rest-http " +
+ "module to classpath).");
+ }
+ catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
+ throw new IgniteCheckedException("Failed to initialize HTTP REST protocol.", e);
+ }
+ }
+
+ /**
* @return Client configuration.
*/
private ConnectorConfiguration config() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2105d722/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java
deleted file mode 100644
index 3c3c5ce..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.rest.handlers.scripting;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.rest.*;
-import org.apache.ignite.internal.processors.rest.handlers.*;
-import org.apache.ignite.internal.processors.rest.request.*;
-import org.apache.ignite.internal.processors.scripting.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.internal.processors.rest.GridRestCommand.*;
-
-/**
- * Compute task command handler.
- */
-public class IgniteScriptingCommandHandler extends GridRestCommandHandlerAdapter {
- /** Supported commands. */
- private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(
- EXECUTE_MAP_REDUCE_SCRIPT,
- RUN_SCRIPT);
-
- /** Emit result. */
- private IgniteJsEmitResult emitRes;
-
- /**
- * @param ctx Context.
- */
- public IgniteScriptingCommandHandler(GridKernalContext ctx) {
- super(ctx);
-
- try {
- IgniteScriptProcessor script = ctx.scripting();
-
- String emitFunction = "function emit(f, args, nodeId) {" +
- "__emitResult.add(f.toString(), args, nodeId);}";
-
- script.addEngineFunction(emitFunction);
-
- emitRes = new IgniteJsEmitResult();
-
- script.addBinding("__emitResult", emitRes);
-
-
- script.addBinding("ignite", new NodeJSIgnite(ctx.grid()));
- }
- catch (IgniteCheckedException e) {
- ctx.log().error(e.getMessage());
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<GridRestCommand> supportedCommands() {
- return SUPPORTED_COMMANDS;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<GridRestResponse> handleAsync(GridRestRequest req) {
- assert req != null;
-
- assert SUPPORTED_COMMANDS.contains(req.command());
-
- switch (req.command()) {
- case RUN_SCRIPT: {
- assert req instanceof RestRunScriptRequest : "Invalid type of run script request.";
-
- return ctx.closure().callLocalSafe(new RunScriptCallable(ctx, (RestRunScriptRequest) req), false);
- }
-
- case EXECUTE_MAP_REDUCE_SCRIPT: {
- assert req instanceof RestMapReduceScriptRequest :
- "Invalid type of execute map reduce script request.";
-
- return ctx.closure().callLocalSafe(
- new MapReduceCallable(ctx, (RestMapReduceScriptRequest)req, emitRes));
- }
- }
-
- return new GridFinishedFuture<>();
- }
-
- /**
- * JS Compute Task.
- */
- private static class JsTask extends ComputeTaskAdapter<String, Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Mapping function. */
- private String mapFunc;
-
- /** Reduce function. */
- private String reduceFunc;
-
- /** Kernal context. */
- private GridKernalContext ctx;
-
- /** Map function argument. */
- private Object arg;
-
- /** Emit results. */
- private IgniteJsEmitResult emitRes;
-
- /**
- * @param mapFunc Map function.
- * @param arg Map function argument.
- * @param reduceFunc Reduce function.
- * @param ctx Kernal context.
- */
- public JsTask(String mapFunc, Object arg, String reduceFunc, GridKernalContext ctx, IgniteJsEmitResult emitRes) {
- this.mapFunc = mapFunc;
- this.reduceFunc = reduceFunc;
- this.arg = arg;
- this.ctx = ctx;
- this.emitRes = emitRes;
- }
-
- /** {@inheritDoc} */
- @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, String arg) {
- try {
- Map<ComputeJob, ClusterNode> map = new HashMap<>();
-
- ctx.scripting().invokeFunction(mapFunc, nodes.toArray(new ClusterNode[nodes.size()]), this.arg);
-
- List<T3<Object, Object, Object>> jsMapRes = emitRes.getEmitResult();
-
- for (T3<Object, Object, Object> task : jsMapRes) {
- map.put(new JsCallFunctionJob((String)task.get1(), task.get2()),
- (ClusterNode)task.get3());
- }
-
- return map;
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Object reduce(List<ComputeJobResult> results) {
- try {
- Object[] data = new Object[results.size()];
-
- for (int i = 0; i < results.size(); ++i) {
- IgniteException err = results.get(i).getException();
-
- if (err != null)
- return new GridRestResponse(GridRestResponse.STATUS_FAILED, err.getMessage());
-
- data[i] = results.get(i).getData();
- }
-
- Object o = ctx.scripting().invokeFunction(reduceFunc, data, null);
- return o;
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
- }
-
- /**
- * Js call function job.
- */
- private static class JsCallFunctionJob extends ComputeJobAdapter {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Function to call. */
- private String func;
-
- /** Function argument. */
- private Object argv;
-
- /** Ignite instance. */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /**
- * @param func Function to call.
- * @param argv Function argument.
- */
- public JsCallFunctionJob(String func, Object argv) {
- this.func = func;
-
- this.argv = JSONCacheObject.toSimpleObject(argv);
- }
-
- /** {@inheritDoc} */
- @Override public Object execute() throws IgniteException {
- try {
- return ((IgniteKernal)ignite).context().scripting().invokeFunction(func, argv);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
- }
-
- /**
- * Call java script function.
- */
- private static class JsFunctionCallable implements IgniteCallable<Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Function to call. */
- private String func;
-
- /** Function argument. */
- private Object arg;
-
- /** Ignite instance. */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /**
- * @param func Function to call.
- * @param arg Function argument.
- */
- public JsFunctionCallable(String func, Object arg) {
- this.func = func;
- this.arg = arg;
- }
-
- /** {@inheritDoc} */
- @Override public Object call() {
- try {
- return ((IgniteKernal)ignite).context().scripting().invokeFunction(func, arg);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
- }
-
- /**
- * Run script callable.
- */
- private static class RunScriptCallable implements Callable<GridRestResponse> {
- /** Kernal context. */
- private GridKernalContext ctx;
-
- /** Run script request. */
- private RestRunScriptRequest req;
-
- /**
- * @param ctx Kernal context.
- * @param req Run script request.
- */
- public RunScriptCallable(GridKernalContext ctx, RestRunScriptRequest req) {
- this.ctx = ctx;
- this.req = req;
- }
-
- /** {@inheritDoc} */
- @Override public GridRestResponse call() throws Exception {
- try {
- return new GridRestResponse(ctx.grid().compute().call(
- new JsFunctionCallable(req.script(), req.argument())));
- }
- catch (Exception e) {
- return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
- }
- }
- }
-
- /**
- * Map reduce callable.
- */
- private static class MapReduceCallable implements Callable<GridRestResponse> {
- /** Kernal context. */
- private GridKernalContext ctx;
-
- /** Run script request. */
- private RestMapReduceScriptRequest req;
-
- /** Emit results. */
- IgniteJsEmitResult emitRes;
-
- /**
- * @param ctx Kernal context.
- * @param req Run script request.
- * @param emitRes Emit function results.
- */
- public MapReduceCallable(GridKernalContext ctx, RestMapReduceScriptRequest req,IgniteJsEmitResult emitRes) {
- this.ctx = ctx;
- this.req = req;
- this.emitRes = emitRes;
- }
-
- /** {@inheritDoc} */
- @Override public GridRestResponse call() throws Exception {
- try {
- return new GridRestResponse(ctx.grid().compute().execute(
- new JsTask(req.mapFunction(), req.argument(), req.reduceFunction(), ctx, emitRes),
- null));
- }
- catch (Exception e) {
- return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2105d722/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/NodeJSIgnite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/NodeJSIgnite.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/NodeJSIgnite.java
index 777a3aa..3a54f2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/NodeJSIgnite.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/NodeJSIgnite.java
@@ -29,7 +29,7 @@ public class NodeJSIgnite {
/**
* @param ignite Ignite.
*/
- NodeJSIgnite(Ignite ignite) {
+ public NodeJSIgnite(Ignite ignite) {
this.ignite = ignite;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2105d722/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsComputeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsComputeSelfTest.java b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsComputeSelfTest.java
index a9ff998..6e4e5a4 100644
--- a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsComputeSelfTest.java
+++ b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsComputeSelfTest.java
@@ -45,6 +45,11 @@ public class NodeJsComputeSelfTest extends NodeJsAbstractTest {
stopAllGrids();
}
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ grid(0).cache("mycache").removeAll();
+ }
+
/**
* @throws Exception If failed.
*/
@@ -97,11 +102,11 @@ public class NodeJsComputeSelfTest extends NodeJsAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testRestartGrid() throws Exception {
+ public void _testRestartGrid() throws Exception {
final AtomicInteger id = new AtomicInteger(2);
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
- ArrayList<Integer> ids = new ArrayList<Integer>();
+ ArrayList<Integer> ids = new ArrayList<>();
for (int i = 0 ; i < 3; ++i) {
int cur = id.getAndIncrement();
@@ -111,8 +116,8 @@ public class NodeJsComputeSelfTest extends NodeJsAbstractTest {
ids.add(cur);
}
- for (int i = 0; i < ids.size(); ++i)
- stopGrid(ids.get(i));
+ for (Integer id1 : ids)
+ stopGrid(id1);
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2105d722/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/IgniteScriptingCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/IgniteScriptingCommandHandler.java b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/IgniteScriptingCommandHandler.java
new file mode 100644
index 0000000..7b7273f
--- /dev/null
+++ b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/IgniteScriptingCommandHandler.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.rest.protocols.http.jetty;
+
+import net.sf.json.*;
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.rest.*;
+import org.apache.ignite.internal.processors.rest.handlers.*;
+import org.apache.ignite.internal.processors.rest.handlers.scripting.*;
+import org.apache.ignite.internal.processors.rest.request.*;
+import org.apache.ignite.internal.processors.scripting.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.processors.rest.GridRestCommand.*;
+
+/**
+ * Compute task command handler.
+ */
+public class IgniteScriptingCommandHandler extends GridRestCommandHandlerAdapter {
+ /** Supported commands. */
+ private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(
+ EXECUTE_MAP_REDUCE_SCRIPT,
+ RUN_SCRIPT);
+
+ /** Emit result. */
+ private IgniteJsEmitResult emitRes;
+
+ /**
+ * @param ctx Context.
+ */
+ public IgniteScriptingCommandHandler(GridKernalContext ctx) {
+ super(ctx);
+
+ try {
+ IgniteScriptProcessor script = ctx.scripting();
+
+ String emitFunction = "function emit(f, args, nodeId) {" +
+ "__emitResult.add(f.toString(), args, nodeId);}";
+
+ script.addEngineFunction(emitFunction);
+
+ emitRes = new IgniteJsEmitResult();
+
+ script.addBinding("__emitResult", emitRes);
+
+
+ script.addBinding("ignite", new NodeJSIgnite(ctx.grid()));
+ }
+ catch (IgniteCheckedException e) {
+ ctx.log().error(e.getMessage());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridRestCommand> supportedCommands() {
+ return SUPPORTED_COMMANDS;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<GridRestResponse> handleAsync(GridRestRequest req) {
+ assert req != null;
+
+ assert SUPPORTED_COMMANDS.contains(req.command());
+
+ switch (req.command()) {
+ case RUN_SCRIPT: {
+ assert req instanceof RestRunScriptRequest : "Invalid type of run script request.";
+
+ return ctx.closure().callLocalSafe(new RunScriptCallable(ctx, (RestRunScriptRequest) req), false);
+ }
+
+ case EXECUTE_MAP_REDUCE_SCRIPT: {
+ assert req instanceof RestMapReduceScriptRequest :
+ "Invalid type of execute map reduce script request.";
+
+ return ctx.closure().callLocalSafe(
+ new MapReduceCallable(ctx, (RestMapReduceScriptRequest)req, emitRes));
+ }
+ }
+
+ return new GridFinishedFuture<>();
+ }
+
+ /**
+ * JS Compute Task.
+ */
+ private static class JsTask extends ComputeTaskAdapter<String, Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Mapping function. */
+ private String mapFunc;
+
+ /** Reduce function. */
+ private String reduceFunc;
+
+ /** Kernal context. */
+ private GridKernalContext ctx;
+
+ /** Map function argument. */
+ private Object arg;
+
+ /** Emit results. */
+ private IgniteJsEmitResult emitRes;
+
+ /**
+ * @param mapFunc Map function.
+ * @param arg Map function argument.
+ * @param reduceFunc Reduce function.
+ * @param ctx Kernal context.
+ */
+ public JsTask(String mapFunc, Object arg, String reduceFunc, GridKernalContext ctx, IgniteJsEmitResult emitRes) {
+ this.mapFunc = mapFunc;
+ this.reduceFunc = reduceFunc;
+ this.arg = arg;
+ this.ctx = ctx;
+ this.emitRes = emitRes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, String arg) {
+ try {
+ Map<ComputeJob, ClusterNode> map = new HashMap<>();
+
+ ctx.scripting().invokeFunction(mapFunc, nodes.toArray(new ClusterNode[nodes.size()]), this.arg);
+
+ List<T3<Object, Object, Object>> jsMapRes = emitRes.getEmitResult();
+
+ for (T3<Object, Object, Object> task : jsMapRes) {
+ map.put(new JsCallFunctionJob((String)task.get1(), task.get2()),
+ (ClusterNode)task.get3());
+ }
+
+ return map;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object reduce(List<ComputeJobResult> results) {
+ try {
+ Object[] data = new Object[results.size()];
+
+ for (int i = 0; i < results.size(); ++i) {
+ IgniteException err = results.get(i).getException();
+
+ if (err != null)
+ return new GridRestResponse(GridRestResponse.STATUS_FAILED, err.getMessage());
+
+ data[i] = results.get(i).getData();
+ }
+
+ Object o = ctx.scripting().invokeJSFunction(reduceFunc, JSONSerializer.toJSON(data), null);
+ return o;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+ }
+
+ /**
+ * Js call function job.
+ */
+ private static class JsCallFunctionJob extends ComputeJobAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Function to call. */
+ private String func;
+
+ /** Function argument. */
+ private Object argv;
+
+ /** Ignite instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /**
+ * @param func Function to call.
+ * @param argv Function argument.
+ */
+ public JsCallFunctionJob(String func, Object argv) {
+ this.func = func;
+
+ this.argv = JSONCacheObject.toSimpleObject(argv);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ try {
+ return ((IgniteKernal)ignite).context().scripting().invokeFunction(func,
+ JSONCacheObject.toSimpleObject(argv), null);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /**
+ * Call java script function.
+ */
+ private static class JsFunctionCallable implements IgniteCallable<Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Function to call. */
+ private String func;
+
+ /** Function argument. */
+ private Object arg;
+
+ /** Ignite instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /**
+ * @param func Function to call.
+ * @param arg Function argument.
+ */
+ public JsFunctionCallable(String func, Object arg) {
+ this.func = func;
+ this.arg = arg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() {
+ try {
+ return ((IgniteKernal)ignite).context().scripting().invokeFunction(func, arg);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+ }
+
+ /**
+ * Run script callable.
+ */
+ private static class RunScriptCallable implements Callable<GridRestResponse> {
+ /** Kernal context. */
+ private GridKernalContext ctx;
+
+ /** Run script request. */
+ private RestRunScriptRequest req;
+
+ /**
+ * @param ctx Kernal context.
+ * @param req Run script request.
+ */
+ public RunScriptCallable(GridKernalContext ctx, RestRunScriptRequest req) {
+ this.ctx = ctx;
+ this.req = req;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridRestResponse call() throws Exception {
+ try {
+ return new GridRestResponse(ctx.grid().compute().call(
+ new JsFunctionCallable(req.script(), req.argument())));
+ }
+ catch (Exception e) {
+ return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Map reduce callable.
+ */
+ private static class MapReduceCallable implements Callable<GridRestResponse> {
+ /** Kernal context. */
+ private GridKernalContext ctx;
+
+ /** Run script request. */
+ private RestMapReduceScriptRequest req;
+
+ /** Emit results. */
+ IgniteJsEmitResult emitRes;
+
+ /**
+ * @param ctx Kernal context.
+ * @param req Run script request.
+ * @param emitRes Emit function results.
+ */
+ public MapReduceCallable(GridKernalContext ctx, RestMapReduceScriptRequest req,IgniteJsEmitResult emitRes) {
+ this.ctx = ctx;
+ this.req = req;
+ this.emitRes = emitRes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridRestResponse call() throws Exception {
+ try {
+ return new GridRestResponse(ctx.grid().compute().execute(
+ new JsTask(req.mapFunction(), req.argument(), req.reduceFunction(), ctx, emitRes),
+ null));
+ }
+ catch (Exception e) {
+ return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
+ }
+ }
+ }
+}