You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2018/08/15 15:15:49 UTC
[2/2] ignite git commit: IGNITE-6284 Add
IgniteCompute.withTaskNoResultCache() with similar effect as
@ComputeTaskNoResultCache - Fixes #4075.
IGNITE-6284 Add IgniteCompute.withTaskNoResultCache() with similar effect as @ComputeTaskNoResultCache - Fixes #4075.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1691af8d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1691af8d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1691af8d
Branch: refs/heads/master
Commit: 1691af8dd165c049258673e14771de36c2feaaec
Parents: 2e9ca8f
Author: EdShangGG <es...@gridgain.com>
Authored: Wed Aug 15 18:14:55 2018 +0300
Committer: Dmitriy Pavlov <dp...@apache.org>
Committed: Wed Aug 15 18:14:55 2018 +0300
----------------------------------------------------------------------
.../mapper/GridContinuousMapperLoadTest1.java | 51 ------
.../mapper/GridContinuousMapperLoadTest2.java | 92 -----------
.../mapper/GridContinuousMapperTask1.java | 157 -------------------
.../mapper/GridContinuousMapperTask2.java | 93 -----------
.../ignite/loadtests/mapper/TestObject.java | 57 -------
5 files changed, 450 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1691af8d/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
deleted file mode 100644
index e7f63dd..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.mapper;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.X;
-
-/**
- * Continuous mapper load test.
- */
-public class GridContinuousMapperLoadTest1 {
- /**
- * Main method.
- *
- * @param args Parameters.
- */
- public static void main(String[] args) {
- try (Ignite g = G.start("examples/config/example-cache.xml")) {
- int max = 30000;
-
- IgniteDataStreamer<Integer, TestObject> ldr = g.dataStreamer("replicated");
-
- for (int i = 0; i < max; i++)
- ldr.addData(i, new TestObject(i, "Test object: " + i));
-
- // Wait for loader to complete.
- ldr.close(false);
-
- X.println("Populated replicated cache.");
-
- g.compute().execute(new GridContinuousMapperTask1(), max);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1691af8d/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
deleted file mode 100644
index 68c36ce..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.mapper;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.X;
-
-/**
- * Continuous mapper load test.
- */
-public class GridContinuousMapperLoadTest2 {
- /**
- * Main method.
- *
- * @param args Parameters.
- * @throws Exception If failed.
- */
- public static void main(String[] args) throws Exception {
- final AtomicInteger jobIdGen = new AtomicInteger();
- final AtomicInteger sentJobs = new AtomicInteger();
-
- final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
-
- /** Worker thread. */
- Thread t = new Thread("mapper-worker") {
- @Override public void run() {
- try {
- while (!Thread.currentThread().isInterrupted())
- queue.put(jobIdGen.incrementAndGet());
- }
- catch (InterruptedException ignore) {
- // No-op.
- }
- }
- };
-
- Ignite g = G.start("examples/config/example-cache.xml");
-
- try {
- int max = 20000;
-
- IgniteDataStreamer<Integer, TestObject> ldr = g.dataStreamer("replicated");
-
- for (int i = 0; i < max; i++)
- ldr.addData(i, new TestObject(i, "Test object: " + i));
-
- // Wait for loader to complete.
- ldr.close(false);
-
- X.println("Populated replicated cache.");
-
- t.start();
-
- while (sentJobs.get() < max) {
- int[] jobIds = new int[10];
-
- for (int i = 0; i < jobIds.length; i++)
- jobIds[i] = queue.take();
-
- sentJobs.addAndGet(10);
-
- g.compute().execute(new GridContinuousMapperTask2(), jobIds);
- }
- }
- finally {
- t.interrupt();
-
- t.join();
-
- G.stopAll(false);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1691af8d/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java
deleted file mode 100644
index 4577806..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.mapper;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobResultPolicy;
-import org.apache.ignite.compute.ComputeTaskAdapter;
-import org.apache.ignite.compute.ComputeTaskContinuousMapper;
-import org.apache.ignite.compute.ComputeTaskNoResultCache;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.TaskContinuousMapperResource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Test task.
- */
-@SuppressWarnings("TransientFieldNotInitialized")
-@ComputeTaskNoResultCache
-public class GridContinuousMapperTask1 extends ComputeTaskAdapter<Integer, Integer> {
- /** Job ID generator. */
- private final transient AtomicInteger jobIdGen = new AtomicInteger();
-
- /** Mapper. */
- @TaskContinuousMapperResource
- private ComputeTaskContinuousMapper mapper;
-
- /** Grid. */
- @IgniteInstanceResource
- private Ignite g;
-
- /** Blocking queue. */
- private final transient LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
-
- /** Sent jobs count. */
- private final transient AtomicInteger sentJobs = new AtomicInteger();
-
- /** Maximum number of executions. */
- private transient int maxExecs;
-
- /** Worker thread. */
- private transient Thread t = new Thread("mapper-worker") {
- @Override public void run() {
- try {
- while (!Thread.currentThread().isInterrupted())
- queue.put(jobIdGen.getAndIncrement());
- }
- catch (InterruptedException ignore) {
- // No-op.
- }
- }
- };
-
- /**
- * Sends job to node.
- *
- * @param n Node.
- * @throws IgniteException If failed.
- */
- private void sendJob(ClusterNode n) {
- try {
- int jobId = queue.take();
-
- sentJobs.incrementAndGet();
-
- mapper.send(new ComputeJobAdapter(jobId) {
- @IgniteInstanceResource
- private Ignite g;
-
- @Override public Object execute() {
- Integer jobId = argument(0);
-
- X.println(">>> Received job for ID: " + jobId);
-
- return g.cache("replicated").localPeek(jobId, CachePeekMode.ONHEAP);
- }
- }, n);
- }
- catch (InterruptedException e) {
- throw new IgniteException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Integer arg) {
- maxExecs = arg;
-
- // Start worker thread.
- t.start();
-
- if (g.cluster().nodes().size() == 1)
- sendJob(g.cluster().localNode());
- else
- for (ClusterNode n : g.cluster().forRemotes().nodes())
- sendJob(n);
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
- if (res.getException() != null)
- throw new IgniteException(res.getException());
-
- TestObject o = res.getData();
-
- assert o != null;
-
- X.println("Received job result from node [resId=" + o.getId() + ", node=" + res.getNode().id() + ']');
-
- if (sentJobs.get() < maxExecs)
- sendJob(res.getNode());
-
- return ComputeJobResultPolicy.WAIT;
- }
-
- /** {@inheritDoc} */
- @Override public Integer reduce(List<ComputeJobResult> results) {
- X.println(">>> Reducing task...");
-
- t.interrupt();
-
- try {
- t.join();
- }
- catch (InterruptedException e) {
- throw new IgniteException(e);
- }
-
- return null;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1691af8d/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java
deleted file mode 100644
index 9a795c4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.mapper;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobResultPolicy;
-import org.apache.ignite.compute.ComputeTaskAdapter;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Test task.
- */
-public class GridContinuousMapperTask2 extends ComputeTaskAdapter<int[], Integer> {
- /** Grid. */
- @IgniteInstanceResource
- private Ignite g;
-
- /** {@inheritDoc} */
- @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable int[] jobIds) {
- Map<ComputeJob, ClusterNode> mappings = new HashMap<>(jobIds.length);
-
- Iterator<ClusterNode> nodeIter = g.cluster().forRemotes().nodes().iterator();
-
- for (int jobId : jobIds) {
- ComputeJob job = new ComputeJobAdapter(jobId) {
- @IgniteInstanceResource
- private Ignite g;
-
- @Override public Object execute() {
- Integer jobId = argument(0);
-
- X.println(">>> Received job for ID: " + jobId);
-
- return g.cache("replicated").localPeek(jobId, CachePeekMode.ONHEAP);
- }
- };
-
- // If only local node in the grid.
- if (g.cluster().nodes().size() == 1)
- mappings.put(job, g.cluster().localNode());
- else {
- ClusterNode n = nodeIter.hasNext() ? nodeIter.next() :
- (nodeIter = g.cluster().forRemotes().nodes().iterator()).next();
-
- mappings.put(job, n);
- }
- }
-
- return mappings;
- }
-
- /** {@inheritDoc} */
- @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
- TestObject o = res.getData();
-
- X.println("Received job result from node [resId=" + o.getId() + ", node=" + res.getNode().id() + ']');
-
- return super.result(res, rcvd);
- }
-
- /** {@inheritDoc} */
- @Override public Integer reduce(List<ComputeJobResult> results) {
- X.println(">>> Reducing task...");
-
- return null;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1691af8d/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java
deleted file mode 100644
index 9a17e0e..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.mapper;
-
-import java.io.Serializable;
-import org.apache.ignite.cache.query.annotations.QuerySqlField;
-
-/**
- * Test object.
- */
-public class TestObject implements Serializable {
- /** ID. */
- @QuerySqlField(index = true)
- private int id;
-
- /** Text. */
- @QuerySqlField
- private String txt;
-
- /**
- * @param id ID.
- * @param txt Text.
- */
- public TestObject(int id, String txt) {
- this.id = id;
- this.txt = txt;
- }
-
- /**
- * @return ID.
- */
- public int getId() {
- return id;
- }
-
- /**
- * @return Text.
- */
- public String getText() {
- return txt;
- }
-}
\ No newline at end of file