You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2018/08/15 15:15:49 UTC

[2/2] ignite git commit: IGNITE-6284 Add IgniteCompute.withTaskNoResultCache() with similar effect as @ComputeTaskNoResultCache - Fixes #4075.

IGNITE-6284 Add IgniteCompute.withTaskNoResultCache() with similar effect as @ComputeTaskNoResultCache - Fixes #4075.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1691af8d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1691af8d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1691af8d

Branch: refs/heads/master
Commit: 1691af8dd165c049258673e14771de36c2feaaec
Parents: 2e9ca8f
Author: EdShangGG <es...@gridgain.com>
Authored: Wed Aug 15 18:14:55 2018 +0300
Committer: Dmitriy Pavlov <dp...@apache.org>
Committed: Wed Aug 15 18:14:55 2018 +0300

----------------------------------------------------------------------
 .../mapper/GridContinuousMapperLoadTest1.java   |  51 ------
 .../mapper/GridContinuousMapperLoadTest2.java   |  92 -----------
 .../mapper/GridContinuousMapperTask1.java       | 157 -------------------
 .../mapper/GridContinuousMapperTask2.java       |  93 -----------
 .../ignite/loadtests/mapper/TestObject.java     |  57 -------
 5 files changed, 450 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1691af8d/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
deleted file mode 100644
index e7f63dd..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.mapper;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.X;
-
-/**
- * Continuous mapper load test.
- */
-public class GridContinuousMapperLoadTest1 {
-    /**
-     * Main method.
-     *
-     * @param args Parameters.
-     */
-    public static void main(String[] args) {
-        try (Ignite g = G.start("examples/config/example-cache.xml")) {
-            int max = 30000;
-
-            IgniteDataStreamer<Integer, TestObject> ldr = g.dataStreamer("replicated");
-
-            for (int i = 0; i < max; i++)
-                ldr.addData(i, new TestObject(i, "Test object: " + i));
-
-            // Wait for loader to complete.
-            ldr.close(false);
-
-            X.println("Populated replicated cache.");
-
-            g.compute().execute(new GridContinuousMapperTask1(), max);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1691af8d/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
deleted file mode 100644
index 68c36ce..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.mapper;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.X;
-
-/**
- * Continuous mapper load test.
- */
-public class GridContinuousMapperLoadTest2 {
-    /**
-     * Main method.
-     *
-     * @param args Parameters.
-     * @throws Exception If failed.
-     */
-    public static void main(String[] args) throws Exception {
-        final AtomicInteger jobIdGen = new AtomicInteger();
-        final AtomicInteger sentJobs = new AtomicInteger();
-
-        final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
-
-        /** Worker thread. */
-        Thread t = new Thread("mapper-worker") {
-            @Override public void run() {
-                try {
-                    while (!Thread.currentThread().isInterrupted())
-                        queue.put(jobIdGen.incrementAndGet());
-                }
-                catch (InterruptedException ignore) {
-                    // No-op.
-                }
-            }
-        };
-
-        Ignite g = G.start("examples/config/example-cache.xml");
-
-        try {
-            int max = 20000;
-
-            IgniteDataStreamer<Integer, TestObject> ldr = g.dataStreamer("replicated");
-
-            for (int i = 0; i < max; i++)
-                ldr.addData(i, new TestObject(i, "Test object: " + i));
-
-            // Wait for loader to complete.
-            ldr.close(false);
-
-            X.println("Populated replicated cache.");
-
-            t.start();
-
-            while (sentJobs.get() < max) {
-                int[] jobIds = new int[10];
-
-                for (int i = 0; i < jobIds.length; i++)
-                    jobIds[i] = queue.take();
-
-                sentJobs.addAndGet(10);
-
-                g.compute().execute(new GridContinuousMapperTask2(), jobIds);
-            }
-        }
-        finally {
-            t.interrupt();
-
-            t.join();
-
-            G.stopAll(false);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1691af8d/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java
deleted file mode 100644
index 4577806..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask1.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.mapper;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobResultPolicy;
-import org.apache.ignite.compute.ComputeTaskAdapter;
-import org.apache.ignite.compute.ComputeTaskContinuousMapper;
-import org.apache.ignite.compute.ComputeTaskNoResultCache;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.TaskContinuousMapperResource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Test task.
- */
-@SuppressWarnings("TransientFieldNotInitialized")
-@ComputeTaskNoResultCache
-public class GridContinuousMapperTask1 extends ComputeTaskAdapter<Integer, Integer> {
-    /** Job ID generator. */
-    private final transient AtomicInteger jobIdGen = new AtomicInteger();
-
-    /** Mapper. */
-    @TaskContinuousMapperResource
-    private ComputeTaskContinuousMapper mapper;
-
-    /** Grid. */
-    @IgniteInstanceResource
-    private Ignite g;
-
-    /** Blocking queue. */
-    private final transient LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
-
-    /** Sent jobs count. */
-    private final transient AtomicInteger sentJobs = new AtomicInteger();
-
-    /** Maximum number of executions. */
-    private transient int maxExecs;
-
-    /** Worker thread. */
-    private transient Thread t = new Thread("mapper-worker") {
-        @Override public void run() {
-            try {
-                while (!Thread.currentThread().isInterrupted())
-                    queue.put(jobIdGen.getAndIncrement());
-            }
-            catch (InterruptedException ignore) {
-                // No-op.
-            }
-        }
-    };
-
-    /**
-     * Sends job to node.
-     *
-     * @param n Node.
-     * @throws IgniteException If failed.
-     */
-    private void sendJob(ClusterNode n) {
-        try {
-            int jobId = queue.take();
-
-            sentJobs.incrementAndGet();
-
-            mapper.send(new ComputeJobAdapter(jobId) {
-                @IgniteInstanceResource
-                private Ignite g;
-
-                @Override public Object execute() {
-                    Integer jobId = argument(0);
-
-                    X.println(">>> Received job for ID: " + jobId);
-
-                    return g.cache("replicated").localPeek(jobId, CachePeekMode.ONHEAP);
-                }
-            }, n);
-        }
-        catch (InterruptedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Integer arg) {
-        maxExecs = arg;
-
-        // Start worker thread.
-        t.start();
-
-        if (g.cluster().nodes().size() == 1)
-            sendJob(g.cluster().localNode());
-        else
-            for (ClusterNode n : g.cluster().forRemotes().nodes())
-                sendJob(n);
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
-        if (res.getException() != null)
-            throw new IgniteException(res.getException());
-
-        TestObject o = res.getData();
-
-        assert o != null;
-
-        X.println("Received job result from node [resId=" + o.getId() + ", node=" + res.getNode().id() + ']');
-
-        if (sentJobs.get() < maxExecs)
-            sendJob(res.getNode());
-
-        return ComputeJobResultPolicy.WAIT;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Integer reduce(List<ComputeJobResult> results) {
-        X.println(">>> Reducing task...");
-
-        t.interrupt();
-
-        try {
-            t.join();
-        }
-        catch (InterruptedException e) {
-            throw new IgniteException(e);
-        }
-
-        return null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1691af8d/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java
deleted file mode 100644
index 9a795c4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperTask2.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.mapper;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobResultPolicy;
-import org.apache.ignite.compute.ComputeTaskAdapter;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Test task.
- */
-public class GridContinuousMapperTask2 extends ComputeTaskAdapter<int[], Integer> {
-    /** Grid. */
-    @IgniteInstanceResource
-    private Ignite g;
-
-    /** {@inheritDoc} */
-    @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable int[] jobIds) {
-        Map<ComputeJob, ClusterNode> mappings = new HashMap<>(jobIds.length);
-
-        Iterator<ClusterNode> nodeIter = g.cluster().forRemotes().nodes().iterator();
-
-        for (int jobId : jobIds) {
-            ComputeJob job = new ComputeJobAdapter(jobId) {
-                @IgniteInstanceResource
-                private Ignite g;
-
-                @Override public Object execute() {
-                    Integer jobId = argument(0);
-
-                    X.println(">>> Received job for ID: " + jobId);
-
-                    return g.cache("replicated").localPeek(jobId, CachePeekMode.ONHEAP);
-                }
-            };
-
-            // If only local node in the grid.
-            if (g.cluster().nodes().size() == 1)
-                mappings.put(job, g.cluster().localNode());
-            else {
-                ClusterNode n = nodeIter.hasNext() ? nodeIter.next() :
-                    (nodeIter = g.cluster().forRemotes().nodes().iterator()).next();
-
-                mappings.put(job, n);
-            }
-        }
-
-        return mappings;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
-        TestObject o = res.getData();
-
-        X.println("Received job result from node [resId=" + o.getId() + ", node=" + res.getNode().id() + ']');
-
-        return super.result(res, rcvd);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Integer reduce(List<ComputeJobResult> results) {
-        X.println(">>> Reducing task...");
-
-        return null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1691af8d/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java
deleted file mode 100644
index 9a17e0e..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/TestObject.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.mapper;
-
-import java.io.Serializable;
-import org.apache.ignite.cache.query.annotations.QuerySqlField;
-
-/**
- * Test object.
- */
-public class TestObject implements Serializable {
-    /** ID. */
-    @QuerySqlField(index = true)
-    private int id;
-
-    /** Text. */
-    @QuerySqlField
-    private String txt;
-
-    /**
-     * @param id ID.
-     * @param txt Text.
-     */
-    public TestObject(int id, String txt) {
-        this.id = id;
-        this.txt = txt;
-    }
-
-    /**
-     * @return ID.
-     */
-    public int getId() {
-        return id;
-    }
-
-    /**
-     * @return Text.
-     */
-    public String getText() {
-        return txt;
-    }
-}
\ No newline at end of file