You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by yu...@apache.org on 2016/01/25 13:57:27 UTC

[1/2] reef git commit: [REEF-1130] Worker side VortexAggregateFunction reception

Repository: reef
Updated Branches:
  refs/heads/master 86654acd4 -> a1f622519


http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
index 2828ccb..110143f 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
@@ -20,6 +20,7 @@ package org.apache.reef.vortex.driver;
 
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.vortex.common.AggregateFunctionRepository;
 import org.junit.Test;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
index 1b9abac..2b7cf54 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
@@ -76,13 +76,14 @@ public final class TestUtil {
       public Object answer(final InvocationOnMock invocation) throws Throwable {
         final VortexRequest request = (VortexRequest)invocation.getArguments()[1];
         if (request instanceof TaskletCancellationRequest) {
-          final TaskletReport cancelReport = new TaskletCancelledReport(request.getTaskletId());
+          final TaskletReport cancelReport = new TaskletCancelledReport(
+              ((TaskletCancellationRequest)request).getTaskletId());
           master.workerReported(workerManager.getId(), new WorkerReport(Collections.singleton(cancelReport)));
         }
 
         return null;
       }
-    }).when(vortexRequestor).send(any(RunningTask.class), any(VortexRequest.class));
+    }).when(vortexRequestor).sendAsync(any(RunningTask.class), any(VortexRequest.class));
 
     return workerManager;
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
index 69b04c6..4dfa9e7 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
@@ -85,7 +85,7 @@ public final class AddOneTest {
         .set(VortexMasterConf.WORKER_MEM, 64)
         .set(VortexMasterConf.WORKER_CORES, 4)
         .set(VortexMasterConf.WORKER_CAPACITY, 2000)
-        .set(VortexMasterConf.VORTEX_START, AddOneCallbackTestStart.class)
+        .set(VortexMasterConf.VORTEX_START, AddOneTestStart.class)
         .build();
 
     final VortexJobConf vortexJobConf = VortexJobConf.newBuilder()

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTest.java
new file mode 100644
index 0000000..7a2f229
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.applications.vortex.sumones;
+
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.vortex.driver.VortexJobConf;
+import org.apache.reef.vortex.driver.VortexMasterConf;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Launch the SumOnes Vortex test.
+ */
+public final class SumOnesTest {
+  private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment();
+
+  /**
+   * Set up the test environment.
+   */
+  @Before
+  public void setUp() throws Exception {
+    this.testEnvironment.setUp();
+  }
+
+  /**
+   * Tear down the test environment.
+   */
+  @After
+  public void tearDown() throws Exception {
+    this.testEnvironment.tearDown();
+  }
+
+  /**
+   * Run the SumOnes test.
+   */
+  @Test
+  public void testVortexSumOnes() {
+
+    final Configuration vortexMasterConf = VortexMasterConf.CONF
+        .set(VortexMasterConf.WORKER_NUM, 2)
+        .set(VortexMasterConf.WORKER_MEM, 64)
+        .set(VortexMasterConf.WORKER_CORES, 4)
+        .set(VortexMasterConf.WORKER_CAPACITY, 2000)
+        .set(VortexMasterConf.VORTEX_START, SumOnesTestStart.class)
+        .build();
+
+    final VortexJobConf vortexJobConf = VortexJobConf.newBuilder()
+        .setJobName("TEST_Vortex_SumOnesTest")
+        .setVortexMasterConf(vortexMasterConf)
+        .build();
+
+    final LauncherStatus status = this.testEnvironment.run(vortexJobConf.getConfiguration());
+    Assert.assertTrue("Job state after execution: " + status, status.isSuccess());
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java
new file mode 100644
index 0000000..c742d3e
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.applications.vortex.sumones;
+
+import org.apache.reef.vortex.api.*;
+import org.apache.reef.vortex.examples.sumones.AdditionAggregateFunction;
+import org.apache.reef.vortex.examples.sumones.IdentityFunction;
+
+import javax.inject.Inject;
+import java.util.Vector;
+
+/**
+ * Test correctness of an aggregation function that adds integer outputs (ones) on Vortex.
+ */
+public final class SumOnesTestStart implements VortexStart {
+  @Inject
+  private SumOnesTestStart() {
+  }
+
+  /**
+   * Test correctness of an aggregation function that adds integer outputs (ones) on Vortex.
+   */
+  @Override
+  public void start(final VortexThreadPool vortexThreadPool) {
+    final int numberOfOnesToSum = 1000;
+    final Vector<Integer> inputVector = new Vector<>();
+    for (int i = 0; i < numberOfOnesToSum; i++) {
+      inputVector.add(1);
+    }
+
+    final VortexAggregateFuture<Integer, Integer> future =
+        vortexThreadPool.submit(new AdditionAggregateFunction(), new IdentityFunction(), inputVector);
+
+    try {
+      AggregateResultSynchronous<Integer, Integer> result;
+      int allSum = 0;
+
+      result = future.get();
+      allSum += getAggregateResult(result);
+
+      while (result.hasNext()) {
+        result = future.get();
+
+        final int sumResult = getAggregateResult(result);
+
+        int sumInputs = 0;
+        for (int i : result.getAggregatedInputs()) {
+          sumInputs += 1;
+        }
+
+        assert sumResult == sumInputs;
+        allSum += sumResult;
+      }
+
+      assert allSum == numberOfOnesToSum;
+
+    } catch (final InterruptedException ie) {
+      throw new RuntimeException(ie);
+    }
+  }
+
+  private static int getAggregateResult(final AggregateResultSynchronous<Integer, Integer> result) {
+    try {
+      return result.getAggregateResult();
+    } catch (final VortexAggregateException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/package-info.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/package-info.java
new file mode 100644
index 0000000..e2f18de
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Vortex SumOnes Aggregation test.
+ */
+package org.apache.reef.tests.applications.vortex.sumones;


[2/2] reef git commit: [REEF-1130] Worker side VortexAggregateFunction reception

Posted by yu...@apache.org.
[REEF-1130] Worker side VortexAggregateFunction reception

This addressed the issue by
  * Modified and added Avro classes to facilitate submission of Aggregate Functions to VortexWorkers.
  * Simple aggregation logic on VortexWorkers without AggregationPolicy.
  * Simple aggregation scenario and test.

JIRA:
  [REEF-1130](https://issues.apache.org/jira/browse/REEF-1130)

Pull request:
  This closes #788


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/a1f62251
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/a1f62251
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/a1f62251

Branch: refs/heads/master
Commit: a1f622519a986a3e2121cfea4f5e06f0410815cb
Parents: 86654ac
Author: Andrew Chung <af...@gmail.com>
Authored: Wed Jan 20 10:46:35 2016 -0800
Committer: Yunseong Lee <yu...@apache.org>
Committed: Mon Jan 25 20:56:04 2016 +0800

----------------------------------------------------------------------
 .../src/main/avro/VortexRequest.avsc            |  30 +++-
 .../apache/reef/vortex/api/AggregateResult.java |  24 +--
 .../vortex/api/AggregateResultSynchronous.java  |  75 ++++++++
 .../reef/vortex/api/VortexAggregateFuture.java  |  71 ++++----
 .../common/AggregateFunctionRepository.java     |  71 ++++++++
 .../TaskletAggregateExecutionRequest.java       |  69 ++++++++
 .../common/TaskletAggregationRequest.java       |  94 ++++++++++
 .../common/TaskletCancellationRequest.java      |   4 +-
 .../vortex/common/TaskletExecutionRequest.java  |   3 +-
 .../reef/vortex/common/VortexAvroUtils.java     |  91 ++++++++--
 .../reef/vortex/common/VortexRequest.java       |   9 +-
 .../driver/AggregateFunctionRepository.java     |  51 ------
 .../reef/vortex/driver/DefaultVortexMaster.java |   2 +-
 .../reef/vortex/driver/RunningWorkers.java      |  11 +-
 .../apache/reef/vortex/driver/VortexDriver.java |   5 +-
 .../reef/vortex/driver/VortexRequestor.java     |  18 +-
 .../reef/vortex/driver/VortexWorkerManager.java |  44 ++++-
 .../vortex/evaluator/AggregateContainer.java    | 117 +++++++++++++
 .../reef/vortex/evaluator/VortexWorker.java     | 175 +++++++++++++------
 .../sumones/AdditionAggregateFunction.java      |  48 +++++
 .../examples/sumones/IdentityFunction.java      |  48 +++++
 .../reef/vortex/examples/sumones/SumOnes.java   |  64 +++++++
 .../examples/sumones/SumOnesAggregateStart.java |  82 +++++++++
 .../vortex/examples/sumones/package-info.java   |  22 +++
 .../reef/vortex/driver/RunningWorkersTest.java  |   1 +
 .../org/apache/reef/vortex/driver/TestUtil.java |   5 +-
 .../applications/vortex/addone/AddOneTest.java  |   2 +-
 .../vortex/sumones/SumOnesTest.java             |  76 ++++++++
 .../vortex/sumones/SumOnesTestStart.java        |  85 +++++++++
 .../vortex/sumones/package-info.java            |  22 +++
 30 files changed, 1222 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
index bf4396e..d0a02fb 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
@@ -20,6 +20,26 @@
   {
     "namespace": "org.apache.reef.vortex.common.avro",
     "type": "record",
+    "name": "AvroTaskletAggregateExecutionRequest",
+    "fields": [
+      {"name": "taskletId", "type": "int"},
+      {"name": "aggregateFunctionId", "type": "int"},
+      {"name": "serializedInput", "type": "bytes"}
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
+    "name": "AvroTaskletAggregationRequest",
+    "fields": [
+      {"name": "aggregateFunctionId", "type": "int"},
+      {"name": "serializedUserFunction", "type": "bytes"},
+      {"name": "serializedAggregateFunction", "type": "bytes"}
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
     "name": "AvroTaskletExecutionRequest",
     "fields": [
       {"name": "taskletId", "type": "int"},
@@ -41,11 +61,17 @@
       {
         "name": "requestType",
         "type": {"type": "enum", "name": "AvroRequestType",
-        "symbols": ["ExecuteTasklet", "CancelTasklet"]}
+        "symbols": ["ExecuteTasklet", "CancelTasklet", "Aggregate", "AggregateExecute"]}
       },
       {
         "name": "taskletRequest",
-        "type": ["null", "AvroTaskletExecutionRequest", "AvroTaskletCancellationRequest"],
+        "type": [
+          "null",
+          "AvroTaskletAggregateExecutionRequest",
+          "AvroTaskletAggregationRequest",
+          "AvroTaskletExecutionRequest",
+          "AvroTaskletCancellationRequest"
+        ],
         "default": null
       }
     ]

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java
index 22fb93f..555ef89 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java
@@ -27,7 +27,7 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * The result of an aggregate.
+ * The result of an aggregate. Registered to callbacks for {@link VortexAggregateFuture}.
  */
 @Public
 @ClientSide
@@ -36,28 +36,23 @@ public final class AggregateResult<TInput, TOutput> {
 
   private final Optional<TOutput> aggregatedOutput;
   private final List<TInput> inputList;
-  private final boolean hasNext;
   private final Optional<Exception> exception;
 
   AggregateResult(final Exception exception,
-                  final List<TInput> inputList,
-                  final boolean hasNext) {
-    this(Optional.<TOutput>empty(), Optional.of(exception), inputList, hasNext);
+                  final List<TInput> inputList) {
+    this(Optional.<TOutput>empty(), Optional.of(exception), inputList);
   }
 
   AggregateResult(final TOutput aggregatedOutput,
-                  final List<TInput> inputList,
-                  final boolean hasNext) {
-    this(Optional.of(aggregatedOutput), Optional.<Exception>empty(), inputList, hasNext);
+                  final List<TInput> inputList) {
+    this(Optional.of(aggregatedOutput), Optional.<Exception>empty(), inputList);
   }
 
   private AggregateResult(final Optional<TOutput> aggregatedOutput,
                           final Optional<Exception> exception,
-                          final List<TInput> inputList,
-                          final boolean hasNext) {
+                          final List<TInput> inputList) {
     this.aggregatedOutput = aggregatedOutput;
     this.inputList = Collections.unmodifiableList(inputList);
-    this.hasNext = hasNext;
     this.exception = exception;
   }
 
@@ -90,11 +85,4 @@ public final class AggregateResult<TInput, TOutput> {
   public Optional<Exception> getException() {
     return exception;
   }
-
-  /**
-   * @return true if more results will be available, false otherwise.
-   */
-  public boolean hasNext() {
-    return hasNext;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResultSynchronous.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResultSynchronous.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResultSynchronous.java
new file mode 100644
index 0000000..c19bafe
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResultSynchronous.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.api;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.util.Optional;
+
+import java.util.List;
+
+/**
+ * The synchronous result of an aggregate, returned by {@link VortexAggregateFuture#get()}.
+ */
+@Public
+@ClientSide
+@Unstable
+public final class AggregateResultSynchronous<TInput, TOutput> {
+  private final AggregateResult<TInput, TOutput> result;
+  private final boolean hasNext;
+
+  AggregateResultSynchronous(final AggregateResult<TInput, TOutput> result, final boolean hasNext) {
+    this.result = result;
+    this.hasNext = hasNext;
+  }
+
+  /**
+   * @return the output of an aggregation, throws the Exception if a Tasklet or an aggregation fails.
+   * If an aggregation fails, {@link VortexAggregateException} will be thrown, otherwise
+   * the Exception that caused the Tasklet to fail will be thrown directly.
+   * @throws VortexAggregateException the Exception that caused the Tasklet or aggregation failure.
+   */
+  public TOutput getAggregateResult() throws VortexAggregateException {
+    return result.getAggregateResult();
+  }
+
+  /**
+   * @return the associated inputs of an aggregation
+   */
+  public List<TInput> getAggregatedInputs() {
+    return result.getAggregatedInputs();
+  }
+
+  /**
+   * If an aggregation fails, {@link VortexAggregateException} will be thrown, otherwise
+   * the Exception that caused the Tasklet to fail will be thrown directly.
+   * @return the Exception that caused the Tasklet or aggregation failure, if any.
+   */
+  public Optional<Exception> getException() {
+    return result.getException();
+  }
+
+  /**
+   * @return true if more results will be available, false otherwise.
+   */
+  public boolean hasNext() {
+    return hasNext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
index 23017b7..6e74d22 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
@@ -19,6 +19,8 @@
 package org.apache.reef.vortex.api;
 
 import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
@@ -29,7 +31,6 @@ import org.apache.reef.vortex.common.VortexFutureDelegate;
 import javax.annotation.concurrent.NotThreadSafe;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.logging.Logger;
 
 /**
  * The interface between user code and aggregation Tasklets.
@@ -41,12 +42,10 @@ import java.util.logging.Logger;
 @NotThreadSafe
 @Unstable
 public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutureDelegate {
-  private static final Logger LOG = Logger.getLogger(VortexAggregateFuture.class.getName());
-
   private final Executor executor;
   private final Codec<TOutput> aggOutputCodec;
-  private final BlockingQueue<AggregateResult> resultQueue;
-  private final Map<Integer, TInput> taskletIdInputMap;
+  private final BlockingQueue<Pair<List<Integer>, AggregateResult>> resultQueue;
+  private final ConcurrentMap<Integer, TInput> taskletIdInputMap;
   private final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler;
 
   @Private
@@ -55,7 +54,7 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur
                                final Codec<TOutput> aggOutputCodec,
                                final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler) {
     this.executor = executor;
-    this.taskletIdInputMap = new HashMap<>(taskletIdInputMap);
+    this.taskletIdInputMap = new ConcurrentHashMap<>(taskletIdInputMap);
     this.resultQueue = new ArrayBlockingQueue<>(taskletIdInputMap.size());
     this.aggOutputCodec = aggOutputCodec;
     this.callbackHandler = callbackHandler;
@@ -64,12 +63,15 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur
   /**
    * @return the next aggregation result for the future, null if no more results.
    */
-  public synchronized AggregateResult get() throws InterruptedException {
+  public synchronized AggregateResultSynchronous<TInput, TOutput> get() throws InterruptedException {
     if (taskletIdInputMap.isEmpty()) {
       return null;
     }
 
-    return resultQueue.take();
+    final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.take();
+
+    removeFromTaskletIdInputMap(resultPair.getLeft());
+    return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty());
   }
 
   /**
@@ -78,26 +80,33 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur
    * @return the next aggregation result for the future, within the user specified timeout, null if no more results.
    * @throws TimeoutException if time out hits.
    */
-  public synchronized AggregateResult get(final long timeout,
-                                          final TimeUnit timeUnit) throws InterruptedException, TimeoutException {
+  public synchronized AggregateResultSynchronous<TInput, TOutput> get(final long timeout, final TimeUnit timeUnit)
+      throws InterruptedException, TimeoutException {
     if (taskletIdInputMap.isEmpty()) {
       return null;
     }
 
-    final AggregateResult result = resultQueue.poll(timeout, timeUnit);
+    final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.poll(timeout, timeUnit);
 
-    if (result == null) {
+    if (resultPair == null) {
       throw new TimeoutException();
     }
 
-    return result;
+    removeFromTaskletIdInputMap(resultPair.getLeft());
+    return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty());
+  }
+
+  private void removeFromTaskletIdInputMap(final List<Integer> taskletIds) {
+    for (final int taskletId : taskletIds) {
+      taskletIdInputMap.remove(taskletId);
+    }
   }
 
   /**
    * @return true if there are no more results to poll.
    */
-  public synchronized boolean isDone() {
-    return taskletIdInputMap.size() == 0;
+  public boolean isDone() {
+    return taskletIdInputMap.isEmpty();
   }
 
   /**
@@ -109,7 +118,7 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur
     try {
       // TODO[REEF-1113]: Handle serialization failure separately in Vortex
       final TOutput result = aggOutputCodec.decode(serializedResult);
-      removeCompletedTasklets(result, Collections.singletonList(taskletId));
+      completedTasklets(result, Collections.singletonList(taskletId));
     } catch (final InterruptedException e) {
       throw new RuntimeException(e);
     }
@@ -124,7 +133,7 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur
     try {
       // TODO[REEF-1113]: Handle serialization failure separately in Vortex
       final TOutput result = aggOutputCodec.decode(serializedResult);
-      removeCompletedTasklets(result, taskletIds);
+      completedTasklets(result, taskletIds);
     } catch (final InterruptedException e) {
       throw new RuntimeException(e);
     }
@@ -137,7 +146,7 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur
   @Override
   public void threwException(final int taskletId, final Exception exception) {
     try {
-      removeFailedTasklets(exception, Collections.singletonList(taskletId));
+      failedTasklets(exception, Collections.singletonList(taskletId));
     } catch (final InterruptedException e) {
       throw new RuntimeException(e);
     }
@@ -150,7 +159,7 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur
   @Override
   public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) {
     try {
-      removeFailedTasklets(exception, taskletIds);
+      failedTasklets(exception, taskletIds);
     } catch (final InterruptedException e) {
       throw new RuntimeException(e);
     }
@@ -166,12 +175,12 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur
   }
 
   /**
-   * Removes completed Tasklets from Tasklets that are expected and invoke callback.
+   * Create and queue result for Tasklets that are expected and invoke callback.
    */
-  private synchronized void removeCompletedTasklets(final TOutput output, final List<Integer> taskletIds)
+  private void completedTasklets(final TOutput output, final List<Integer> taskletIds)
       throws InterruptedException {
-    final AggregateResult result =
-        new AggregateResult(output, getInputs(taskletIds), taskletIdInputMap.size() > 0);
+    final List<TInput> inputs = getInputs(taskletIds);
+    final AggregateResult result = new AggregateResult(output, inputs);
 
     if (callbackHandler != null) {
       executor.execute(new Runnable() {
@@ -182,41 +191,39 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur
       });
     }
 
-    resultQueue.put(result);
+    resultQueue.put(new ImmutablePair<>(taskletIds, result));
   }
 
   /**
-   * Removes failed Tasklets from Tasklets that are expected and invokes callback.
+   * Create and queue result for failed Tasklets that are expected and invokes callback.
    */
-  private synchronized void removeFailedTasklets(final Exception exception, final List<Integer> taskletIds)
+  private void failedTasklets(final Exception exception, final List<Integer> taskletIds)
       throws InterruptedException {
 
     final List<TInput> inputs = getInputs(taskletIds);
-    final AggregateResult failure =
-        new AggregateResult(exception, inputs, taskletIdInputMap.size() > 0);
+    final AggregateResult failure = new AggregateResult(exception, inputs);
 
     if (callbackHandler != null) {
       executor.execute(new Runnable() {
         @Override
         public void run() {
-          // TODO[JIRA REEF-1129]: Add documentation in VortexThreadPool.
           callbackHandler.onFailure(new VortexAggregateException(exception, inputs));
         }
       });
     }
 
-    resultQueue.put(failure);
+    resultQueue.put(new ImmutablePair<>(taskletIds, failure));
   }
 
   /**
    * Gets the inputs on Tasklet aggregation completion.
    */
-  private synchronized List<TInput> getInputs(final List<Integer> taskletIds) {
+  private List<TInput> getInputs(final List<Integer> taskletIds) {
 
     final List<TInput> inputList = new ArrayList<>(taskletIds.size());
 
     for(final int taskletId : taskletIds) {
-      inputList.add(taskletIdInputMap.remove(taskletId));
+      inputList.add(taskletIdInputMap.get(taskletId));
     }
 
     return inputList;

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
new file mode 100644
index 0000000..e70ee1a
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.vortex.api.VortexAggregateFunction;
+import org.apache.reef.vortex.api.VortexFunction;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A repository for {@link VortexAggregateFunction} and its associated {@link VortexFunction},
+ * used to pass functions between VortexMaster and RunningWorkers, as well as used to cache functions
+ * for VortexWorkers on AggregateRequests and AggregateExecutionRequests.
+ */
+@ThreadSafe
+@Unstable
+@Private
+public final class AggregateFunctionRepository {
+  private final ConcurrentMap<Integer, Pair<VortexAggregateFunction, VortexFunction>>
+      aggregateFunctionMap = new ConcurrentHashMap<>();
+
+  @Inject
+  private AggregateFunctionRepository() {
+  }
+
+  /**
+   * Associates an aggregate function ID with a {@link VortexAggregateFunction} and a {@link VortexFunction}.
+   */
+  public Pair<VortexAggregateFunction, VortexFunction> put(final int aggregateFunctionId,
+                                                           final VortexAggregateFunction aggregateFunction,
+                                                           final VortexFunction function) {
+    return aggregateFunctionMap.put(aggregateFunctionId, new ImmutablePair<>(aggregateFunction, function));
+  }
+
+  /**
+   * Gets the {@link VortexAggregateFunction} associated with the aggregate function ID.
+   */
+  public VortexAggregateFunction getAggregateFunction(final int aggregateFunctionId) {
+    return aggregateFunctionMap.get(aggregateFunctionId).getLeft();
+  }
+
+  /**
+   * Gets the {@link VortexFunction} associated with the aggregate function ID.
+   */
+  public VortexFunction getFunction(final int aggregateFunctionId) {
+    return aggregateFunctionMap.get(aggregateFunctionId).getRight();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java
new file mode 100644
index 0000000..db850fc
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * A request from the Vortex Driver to run an aggregate-able function.
+ */
+@Unstable
+@Private
+@DriverSide
+public final class TaskletAggregateExecutionRequest<TInput> implements VortexRequest {
+  private final TInput input;
+  private final int aggregateFunctionId;
+  private final int taskletId;
+
+  public TaskletAggregateExecutionRequest(final int taskletId,
+                                          final int aggregateFunctionId,
+                                          final TInput input) {
+    this.taskletId = taskletId;
+    this.input = input;
+    this.aggregateFunctionId = aggregateFunctionId;
+  }
+
+  /**
+   * @return input of the request.
+   */
+  public TInput getInput() {
+    return input;
+  }
+
+  /**
+   * @return tasklet ID corresponding to the tasklet request.
+   */
+  public int getTaskletId() {
+    return taskletId;
+  }
+
+  /**
+   * @return the AggregateFunctionID of the request.
+   */
+  public int getAggregateFunctionId() {
+    return aggregateFunctionId;
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.ExecuteAggregateTasklet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
new file mode 100644
index 0000000..6a7e289
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.vortex.api.VortexAggregateFunction;
+import org.apache.reef.vortex.api.VortexFunction;
+
+import java.util.List;
+
+/**
+ * A request from the Vortex Driver for the {@link org.apache.reef.vortex.evaluator.VortexWorker} to
+ * record aggregate functions for later execution.
+ */
+@Unstable
+@Private
+@DriverSide
+public final class TaskletAggregationRequest<TInput, TOutput> implements VortexRequest {
+  private final int aggregateFunctionId;
+  private final VortexAggregateFunction<TOutput> userAggregateFunction;
+  private final VortexFunction<TInput, TOutput> function;
+
+  public TaskletAggregationRequest(final int aggregateFunctionId,
+                                   final VortexAggregateFunction<TOutput> aggregateFunction,
+                                   final VortexFunction<TInput, TOutput> function) {
+    this.aggregateFunctionId = aggregateFunctionId;
+    this.userAggregateFunction = aggregateFunction;
+    this.function = function;
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.AggregateTasklets;
+  }
+
+  /**
+   * @return the AggregateFunctionID of the aggregate function.
+   */
+  public int getAggregateFunctionId() {
+    return aggregateFunctionId;
+  }
+
+  /**
+   * @return the aggregate function as specified by the user.
+   */
+  public VortexAggregateFunction getAggregateFunction() {
+    return userAggregateFunction;
+  }
+
+  /**
+   * @return the user specified function.
+   */
+  public VortexFunction getFunction() {
+    return function;
+  }
+
+  /**
+   * Execute the aggregate function using the list of outputs.
+   * @return Output of the function in a serialized form.
+   */
+  public byte[] executeAggregation(final List<TOutput> outputs) throws Exception {
+    final TOutput output = userAggregateFunction.call(outputs);
+    final Codec<TOutput> codec = userAggregateFunction.getOutputCodec();
+
+    // TODO[REEF-1113]: Handle serialization failure separately in Vortex
+    return codec.encode(output);
+  }
+
+  /**
+   * Execute the user specified function.
+   */
+  public TOutput executeFunction(final TInput input) throws Exception {
+    return function.call(input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
index 8f725a8..88f2d89 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
@@ -31,7 +31,9 @@ public final class TaskletCancellationRequest implements VortexRequest {
     this.taskletId = taskletId;
   }
 
-  @Override
+  /**
+   * @return the ID of the VortexTasklet associated with this VortexRequest.
+   */
   public int getTaskletId() {
     return taskletId;
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
index d85c69b..e850c9a 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
@@ -64,9 +64,8 @@ public final class TaskletExecutionRequest<TInput, TOutput> implements VortexReq
   }
 
   /**
-   * Get id of the tasklet.
+   * @return the ID of the VortexTasklet associated with this VortexRequest.
    */
-  @Override
   public int getTaskletId() {
     return taskletId;
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
index 2200af3..ce6b0dd 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -25,9 +25,11 @@ import org.apache.commons.lang.SerializationUtils;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.vortex.api.VortexAggregateFunction;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.common.avro.*;
 
+import javax.inject.Inject;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -41,15 +43,56 @@ import java.util.List;
 @DriverSide
 @Unstable
 public final class VortexAvroUtils {
+  private final AggregateFunctionRepository aggregateFunctionRepository;
+
+  @Inject
+  private VortexAvroUtils(final AggregateFunctionRepository aggregateFunctionRepository) {
+    this.aggregateFunctionRepository = aggregateFunctionRepository;
+  }
+
   /**
    * Serialize VortexRequest to byte array.
    * @param vortexRequest Vortex request message to serialize.
    * @return Serialized byte array.
    */
-  public static byte[] toBytes(final VortexRequest vortexRequest) {
+  public byte[] toBytes(final VortexRequest vortexRequest) {
     // Convert VortexRequest message to Avro message.
     final AvroVortexRequest avroVortexRequest;
     switch (vortexRequest.getType()) {
+    case ExecuteAggregateTasklet:
+      final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest =
+          (TaskletAggregateExecutionRequest) vortexRequest;
+      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
+      final byte[] serializedInputForAggregate =
+        aggregateFunctionRepository.getFunction(taskletAggregateExecutionRequest.getAggregateFunctionId())
+            .getInputCodec().encode(taskletAggregateExecutionRequest.getInput());
+      avroVortexRequest = AvroVortexRequest.newBuilder()
+          .setRequestType(AvroRequestType.AggregateExecute)
+          .setTaskletRequest(
+              AvroTaskletAggregateExecutionRequest.newBuilder()
+                  .setAggregateFunctionId(taskletAggregateExecutionRequest.getAggregateFunctionId())
+                  .setSerializedInput(ByteBuffer.wrap(serializedInputForAggregate))
+                  .setTaskletId(taskletAggregateExecutionRequest.getTaskletId())
+                  .build())
+          .build();
+      break;
+    case AggregateTasklets:
+      final TaskletAggregationRequest taskletAggregationRequest = (TaskletAggregationRequest) vortexRequest;
+
+      // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction
+      final byte[] serializedAggregateFunction = SerializationUtils.serialize(
+          taskletAggregationRequest.getAggregateFunction());
+      final byte[] serializedFunctionForAggregation = SerializationUtils.serialize(
+          taskletAggregationRequest.getFunction());
+      avroVortexRequest = AvroVortexRequest.newBuilder()
+          .setRequestType(AvroRequestType.Aggregate)
+          .setTaskletRequest(AvroTaskletAggregationRequest.newBuilder()
+              .setAggregateFunctionId(taskletAggregationRequest.getAggregateFunctionId())
+              .setSerializedAggregateFunction(ByteBuffer.wrap(serializedAggregateFunction))
+              .setSerializedUserFunction(ByteBuffer.wrap(serializedFunctionForAggregation))
+              .build())
+          .build();
+      break;
     case ExecuteTasklet:
       final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
       // The following TODOs are sub-issues of cleaning up Serializable in Vortex (REEF-504).
@@ -93,7 +136,7 @@ public final class VortexAvroUtils {
    * @param workerReport Worker report message to serialize.
    * @return Serialized byte array.
    */
-  public static byte[] toBytes(final WorkerReport workerReport) {
+  public byte[] toBytes(final WorkerReport workerReport) {
     final List<AvroTaskletReport> workerTaskletReports = new ArrayList<>();
 
     for (final TaskletReport taskletReport : workerReport.getTaskletReports()) {
@@ -179,11 +222,31 @@ public final class VortexAvroUtils {
    * @param bytes Byte array to deserialize.
    * @return De-serialized VortexRequest.
    */
-  public static VortexRequest toVortexRequest(final byte[] bytes) {
+  public VortexRequest toVortexRequest(final byte[] bytes) {
     final AvroVortexRequest avroVortexRequest = toAvroObject(bytes, AvroVortexRequest.class);
 
     final VortexRequest vortexRequest;
     switch (avroVortexRequest.getRequestType()) {
+    case AggregateExecute:
+      final AvroTaskletAggregateExecutionRequest taskletAggregateExecutionRequest =
+          (AvroTaskletAggregateExecutionRequest)avroVortexRequest.getTaskletRequest();
+      vortexRequest = new TaskletAggregateExecutionRequest<>(taskletAggregateExecutionRequest.getTaskletId(),
+          taskletAggregateExecutionRequest.getAggregateFunctionId(),
+          aggregateFunctionRepository.getFunction(taskletAggregateExecutionRequest.getAggregateFunctionId())
+              .getInputCodec().decode(taskletAggregateExecutionRequest.getSerializedInput().array()));
+      break;
+    case Aggregate:
+      final AvroTaskletAggregationRequest taskletAggregationRequest =
+          (AvroTaskletAggregationRequest)avroVortexRequest.getTaskletRequest();
+      final VortexAggregateFunction aggregateFunction =
+          (VortexAggregateFunction) SerializationUtils.deserialize(
+              taskletAggregationRequest.getSerializedAggregateFunction().array());
+      final VortexFunction functionForAggregation =
+          (VortexFunction) SerializationUtils.deserialize(
+              taskletAggregationRequest.getSerializedUserFunction().array());
+      vortexRequest = new TaskletAggregationRequest<>(taskletAggregationRequest.getAggregateFunctionId(),
+          aggregateFunction, functionForAggregation);
+      break;
     case ExecuteTasklet:
       final AvroTaskletExecutionRequest taskletExecutionRequest =
           (AvroTaskletExecutionRequest)avroVortexRequest.getTaskletRequest();
@@ -211,7 +274,7 @@ public final class VortexAvroUtils {
    * @param bytes Byte array to deserialize.
    * @return De-serialized WorkerReport.
    */
-  public static WorkerReport toWorkerReport(final byte[] bytes) {
+  public WorkerReport toWorkerReport(final byte[] bytes) {
     final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class);
     final List<TaskletReport> workerTaskletReports = new ArrayList<>();
 
@@ -221,32 +284,32 @@ public final class VortexAvroUtils {
       switch (avroTaskletReport.getReportType()) {
       case TaskletResult:
         final AvroTaskletResultReport taskletResultReport =
-            (AvroTaskletResultReport)avroTaskletReport.getTaskletReport();
+            (AvroTaskletResultReport) avroTaskletReport.getTaskletReport();
         taskletReport = new TaskletResultReport(taskletResultReport.getTaskletId(),
             taskletResultReport.getSerializedOutput().array());
         break;
       case TaskletAggregationResult:
         final AvroTaskletAggregationResultReport taskletAggregationResultReport =
-            (AvroTaskletAggregationResultReport)avroTaskletReport.getTaskletReport();
+            (AvroTaskletAggregationResultReport) avroTaskletReport.getTaskletReport();
         taskletReport =
             new TaskletAggregationResultReport(taskletAggregationResultReport.getTaskletIds(),
                 taskletAggregationResultReport.getSerializedOutput().array());
         break;
       case TaskletCancelled:
         final AvroTaskletCancelledReport taskletCancelledReport =
-            (AvroTaskletCancelledReport)avroTaskletReport.getTaskletReport();
+            (AvroTaskletCancelledReport) avroTaskletReport.getTaskletReport();
         taskletReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId());
         break;
       case TaskletFailure:
         final AvroTaskletFailureReport taskletFailureReport =
-            (AvroTaskletFailureReport)avroTaskletReport.getTaskletReport();
+            (AvroTaskletFailureReport) avroTaskletReport.getTaskletReport();
         final Exception exception =
             (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
         taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
         break;
       case TaskletAggregationFailure:
         final AvroTaskletAggregationFailureReport taskletAggregationFailureReport =
-            (AvroTaskletAggregationFailureReport)avroTaskletReport.getTaskletReport();
+            (AvroTaskletAggregationFailureReport) avroTaskletReport.getTaskletReport();
         final Exception aggregationException =
             (Exception) SerializationUtils.deserialize(
                 taskletAggregationFailureReport.getSerializedException().array());
@@ -270,7 +333,7 @@ public final class VortexAvroUtils {
    * @param <T> Type of the Avro object.
    * @return Serialized byte array.
    */
-  private static <T> byte[] toBytes(final T avroObject, final Class<T> theClass) {
+  private <T> byte[] toBytes(final T avroObject, final Class<T> theClass) {
     final DatumWriter<T> reportWriter = new SpecificDatumWriter<>(theClass);
     final byte[] theBytes;
     try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
@@ -292,7 +355,7 @@ public final class VortexAvroUtils {
    * @param <T> Type of the Avro object.
    * @return Avro object de-serialized from byte array.
    */
-  private static <T> T toAvroObject(final byte[] bytes, final Class<T> theClass) {
+  private <T> T toAvroObject(final byte[] bytes, final Class<T> theClass) {
     final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
     final SpecificDatumReader<T> reader = new SpecificDatumReader<>(theClass);
     try {
@@ -301,10 +364,4 @@ public final class VortexAvroUtils {
       throw new RuntimeException(e);
     }
   }
-
-  /**
-   * Empty private constructor to prohibit instantiation of utility class.
-   */
-  private VortexAvroUtils() {
-  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
index 133b007..18f44e0 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
@@ -29,16 +29,13 @@ public interface VortexRequest {
    * Type of Request.
    */
   enum RequestType {
+    AggregateTasklets,
     ExecuteTasklet,
-    CancelTasklet
+    CancelTasklet,
+    ExecuteAggregateTasklet
   }
 
   /**
-   * @return the ID of the VortexTasklet associated with this VortexRequest.
-   */
-  int getTaskletId();
-
-  /**
    * @return the type of this VortexRequest.
    */
   RequestType getType();

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java
deleted file mode 100644
index 9b1da7a..0000000
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.driver;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.vortex.api.VortexAggregateFunction;
-
-import javax.annotation.concurrent.ThreadSafe;
-import javax.inject.Inject;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A repository for {@link VortexAggregateFunction}, used to pass functions between {@link VortexMaster} and
- * {@link RunningWorkers}.
- */
-@ThreadSafe
-@Unstable
-@Private
-public final class AggregateFunctionRepository {
-  private final ConcurrentMap<Integer, VortexAggregateFunction> aggregateFunctionMap = new ConcurrentHashMap<>();
-
-  @Inject
-  private AggregateFunctionRepository() {
-  }
-
-  VortexAggregateFunction put(final int aggregateFunctionId, final VortexAggregateFunction function) {
-    return aggregateFunctionMap.put(aggregateFunctionId, function);
-  }
-
-  VortexAggregateFunction get(final int aggregateFunctionId) {
-    return aggregateFunctionMap.get(aggregateFunctionId);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index 55aeb5a..0ce2117 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -94,7 +94,7 @@ final class DefaultVortexMaster implements VortexMaster {
                       final VortexFunction<TInput, TOutput> vortexFunction, final List<TInput> inputs,
                       final Optional<FutureCallback<AggregateResult<TInput, TOutput>>> callback) {
     final int aggregateFunctionId = aggregateIdCounter.getAndIncrement();
-    aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction);
+    aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction, vortexFunction);
     final Codec<TOutput> aggOutputCodec = aggregateFunction.getOutputCodec();
     final List<Tasklet> tasklets = new ArrayList<>(inputs.size());
     final Map<Integer, TInput> taskletIdInputMap = new HashMap<>(inputs.size());

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
index 2bddc18..226ee34 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
@@ -20,9 +20,9 @@ package org.apache.reef.vortex.driver;
 
 import net.jcip.annotations.ThreadSafe;
 
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.util.Optional;
+import org.apache.reef.vortex.common.AggregateFunctionRepository;
 
 import javax.inject.Inject;
 
@@ -159,11 +159,14 @@ final class RunningWorkers {
 
         final Optional<Integer> taskletAggFunctionId =  tasklet.getAggregateFunctionId();
         final VortexWorkerManager vortexWorkerManager = runningWorkers.get(workerId.get());
+
         if (taskletAggFunctionId.isPresent() &&
             !workerHasAggregateFunction(vortexWorkerManager.getId(), taskletAggFunctionId.get())) {
-          // TODO[JIRA REEF-1130]: fetch aggregate function from repo and send aggregate function to worker.
-          throw new NotImplementedException("Serialize aggregate function to worker if it doesn't have it. " +
-              "Complete in REEF-1130.");
+
+          // This assumes that all aggregate tasklets share the same user function.
+          vortexWorkerManager.sendAggregateFunction(taskletAggFunctionId.get(),
+              aggregateFunctionRepository.getAggregateFunction(taskletAggFunctionId.get()), tasklet.getUserFunction());
+          workerAggregateFunctionMap.get(vortexWorkerManager.getId()).add(taskletAggFunctionId.get());
         }
 
         vortexWorkerManager.launchTasklet(tasklet);

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index f581f99..ca58174 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -64,6 +64,7 @@ final class VortexDriver {
   private final EStage<VortexStart> vortexStartEStage;
   private final VortexStart vortexStart;
   private final EStage<Integer> pendingTaskletSchedulerEStage;
+  private final VortexAvroUtils vortexAvroUtils;
 
   @Inject
   private VortexDriver(final EvaluatorRequestor evaluatorRequestor,
@@ -72,6 +73,7 @@ final class VortexDriver {
                        final VortexStart vortexStart,
                        final VortexStartExecutor vortexStartExecutor,
                        final PendingTaskletLauncher pendingTaskletLauncher,
+                       final VortexAvroUtils vortexAvroUtils,
                        @Parameter(VortexMasterConf.WorkerMem.class) final int workerMem,
                        @Parameter(VortexMasterConf.WorkerNum.class) final int workerNum,
                        @Parameter(VortexMasterConf.WorkerCores.class) final int workerCores,
@@ -79,6 +81,7 @@ final class VortexDriver {
     this.vortexStartEStage = new ThreadPoolStage<>(vortexStartExecutor, numOfStartThreads);
     this.vortexStart = vortexStart;
     this.pendingTaskletSchedulerEStage = new SingleThreadStage<>(pendingTaskletLauncher, 1);
+    this.vortexAvroUtils = vortexAvroUtils;
     this.evaluatorRequestor = evaluatorRequestor;
     this.vortexMaster = vortexMaster;
     this.vortexRequestor = vortexRequestor;
@@ -151,7 +154,7 @@ final class VortexDriver {
     @Override
     public void onNext(final TaskMessage taskMessage) {
       final String workerId = taskMessage.getId();
-      final WorkerReport workerReport = VortexAvroUtils.toWorkerReport(taskMessage.get());
+      final WorkerReport workerReport = vortexAvroUtils.toWorkerReport(taskMessage.get());
       vortexMaster.workerReported(workerId, workerReport);
     }
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
index b94b5b0..4aabf32 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
@@ -33,18 +33,30 @@ import java.util.concurrent.Executors;
 @DriverSide
 class VortexRequestor {
   private final ExecutorService executorService = Executors.newCachedThreadPool();
+  private final VortexAvroUtils vortexAvroUtils;
 
   @Inject
-  VortexRequestor() {
+  VortexRequestor(final VortexAvroUtils vortexAvroUtils) {
+    this.vortexAvroUtils = vortexAvroUtils;
   }
 
-  void send(final RunningTask reefTask, final VortexRequest vortexRequest) {
+  /**
+   * Sends a {@link VortexRequest} asynchronously to a {@link org.apache.reef.vortex.evaluator.VortexWorker}.
+   */
+  void sendAsync(final RunningTask reefTask, final VortexRequest vortexRequest) {
     executorService.execute(new Runnable() {
       @Override
       public void run() {
         //  Possible race condition with VortexWorkerManager#terminate is addressed by the global lock in VortexMaster
-        reefTask.send(VortexAvroUtils.toBytes(vortexRequest));
+        send(reefTask, vortexRequest);
       }
     });
   }
+
+  /**
+   * Sends a {@link VortexRequest} synchronously to a {@link org.apache.reef.vortex.evaluator.VortexWorker}.
+   */
+  void send(final RunningTask reefTask, final VortexRequest vortexRequest) {
+    reefTask.send(vortexAvroUtils.toBytes(vortexRequest));
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
index 88911e3..478bf81 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
@@ -21,6 +21,10 @@ package org.apache.reef.vortex.driver;
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.vortex.api.VortexAggregateFunction;
+import org.apache.reef.vortex.api.VortexFunction;
+import org.apache.reef.vortex.common.TaskletAggregateExecutionRequest;
+import org.apache.reef.vortex.common.TaskletAggregationRequest;
 import org.apache.reef.vortex.common.TaskletCancellationRequest;
 import org.apache.reef.vortex.common.TaskletExecutionRequest;
 
@@ -41,17 +45,49 @@ class VortexWorkerManager {
     this.reefTask = reefTask;
   }
 
+  /**
+   * Sends an {@link VortexAggregateFunction} and its {@link VortexFunction} to a
+   * {@link org.apache.reef.vortex.evaluator.VortexWorker}.
+   */
+  <TInput, TOutput> void sendAggregateFunction(final int aggregateFunctionId,
+                                               final VortexAggregateFunction<TOutput> aggregateFunction,
+                                               final VortexFunction<TInput, TOutput> function) {
+    final TaskletAggregationRequest<TInput, TOutput> taskletAggregationRequest =
+        new TaskletAggregationRequest<>(aggregateFunctionId, aggregateFunction, function);
+
+    // The send is synchronous such that we make sure that the aggregate function is sent to the
+    // target worker before attempting to launch an aggregateable tasklet on it.
+    vortexRequestor.send(reefTask, taskletAggregationRequest);
+  }
+
+
+  /**
+   * Sends a request to launch a Tasklet on a {@link org.apache.reef.vortex.evaluator.VortexWorker}.
+   */
   <TInput, TOutput> void launchTasklet(final Tasklet<TInput, TOutput> tasklet) {
     assert !runningTasklets.containsKey(tasklet.getId());
     runningTasklets.put(tasklet.getId(), tasklet);
-    final TaskletExecutionRequest<TInput, TOutput> taskletExecutionRequest
-        = new TaskletExecutionRequest<>(tasklet.getId(), tasklet.getUserFunction(), tasklet.getInput());
-    vortexRequestor.send(reefTask, taskletExecutionRequest);
+
+    if (tasklet.getAggregateFunctionId().isPresent()) {
+      // function is aggregateable.
+      final TaskletAggregateExecutionRequest<TInput> taskletAggregateExecutionRequest =
+          new TaskletAggregateExecutionRequest<>(tasklet.getId(), tasklet.getAggregateFunctionId().get(),
+              tasklet.getInput());
+      vortexRequestor.sendAsync(reefTask, taskletAggregateExecutionRequest);
+    } else {
+      // function is not aggregateable.
+      final TaskletExecutionRequest<TInput, TOutput> taskletExecutionRequest
+          = new TaskletExecutionRequest<>(tasklet.getId(), tasklet.getUserFunction(), tasklet.getInput());
+      vortexRequestor.sendAsync(reefTask, taskletExecutionRequest);
+    }
   }
 
+  /**
+   * Sends a request to cancel a Tasklet on a {@link org.apache.reef.vortex.evaluator.VortexWorker}.
+   */
   void cancelTasklet(final int taskletId) {
     final TaskletCancellationRequest cancellationRequest = new TaskletCancellationRequest(taskletId);
-    vortexRequestor.send(reefTask, cancellationRequest);
+    vortexRequestor.sendAsync(reefTask, cancellationRequest);
   }
 
   List<Tasklet> taskletsDone(final List<Integer> taskletIds) {

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
new file mode 100644
index 0000000..5687e0b
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.evaluator;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.Optional;
+import org.apache.reef.vortex.common.*;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A container for tasklet aggregation, used to preserve output from individual
+ * {@link org.apache.reef.vortex.api.VortexFunction}s and to trigger
+ * {@link org.apache.reef.vortex.api.VortexAggregateFunction}s on the pooled outputs.
+ */
+@Private
+@DriverSide
+@Unstable
+final class AggregateContainer {
+
+  private final Object stateLock = new Object();
+  private final TaskletAggregationRequest taskletAggregationRequest;
+
+  @GuardedBy("stateLock")
+  private final List<Pair<Integer, Object>> completedTasklets = new ArrayList<>();
+
+  @GuardedBy("stateLock")
+  private final List<Pair<Integer, Exception>> failedTasklets = new ArrayList<>();
+
+  AggregateContainer(final TaskletAggregationRequest taskletAggregationRequest) {
+    this.taskletAggregationRequest = taskletAggregationRequest;
+  }
+
+  public TaskletAggregationRequest getTaskletAggregationRequest() {
+    return taskletAggregationRequest;
+  }
+
+  /**
+   * Performs the output aggregation and generates the {@link WorkerReport} to report back to the
+   * {@link org.apache.reef.vortex.driver.VortexDriver}.
+   */
+  public Optional<WorkerReport> aggregateTasklets() {
+    final List<TaskletReport> taskletReports = new ArrayList<>();
+    final List<Object> results = new ArrayList<>();
+    final List<Integer> aggregatedTasklets = new ArrayList<>();
+
+    // Synchronization to prevent duplication of work on the same aggregation function on the same worker.
+    synchronized (stateLock) {
+      // Add the successful tasklets for aggregation.
+      for (final Pair<Integer, Object> resultPair : completedTasklets) {
+        aggregatedTasklets.add(resultPair.getLeft());
+        results.add(resultPair.getRight());
+      }
+
+      // Add failed tasklets to worker report.
+      for (final Pair<Integer, Exception> failedPair : failedTasklets) {
+        taskletReports.add(new TaskletFailureReport(failedPair.getLeft(), failedPair.getRight()));
+      }
+
+      // Drain the tasklets.
+      completedTasklets.clear();
+      failedTasklets.clear();
+    }
+
+    if (!results.isEmpty()) {
+      // Run the aggregation function.
+      try {
+        final byte[] aggregationResult = taskletAggregationRequest.executeAggregation(results);
+        taskletReports.add(new TaskletAggregationResultReport(aggregatedTasklets, aggregationResult));
+      } catch (final Exception e) {
+        taskletReports.add(new TaskletAggregationFailureReport(aggregatedTasklets, e));
+      }
+    }
+
+    return taskletReports.isEmpty() ? Optional.<WorkerReport>empty() : Optional.of(new WorkerReport(taskletReports));
+  }
+
+  /**
+   * Reported when an associated tasklet is complete and adds it to the completion pool.
+   */
+  public void taskletComplete(final int taskletId, final Object result) {
+    synchronized (stateLock) {
+      completedTasklets.add(new ImmutablePair<>(taskletId, result));
+    }
+  }
+
+  /**
+   * Reported when an associated tasklet is complete and adds it to the failure pool.
+   */
+  public void taskletFailed(final int taskletId, final Exception e) {
+    synchronized (stateLock) {
+      failedTasklets.add(new ImmutablePair<>(taskletId, e));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index 3390c22..897d6e9 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -30,6 +30,7 @@ import org.apache.reef.task.events.CloseEvent;
 import org.apache.reef.task.events.DriverMessage;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.common.*;
+import org.apache.reef.vortex.common.AggregateFunctionRepository;
 import org.apache.reef.vortex.driver.VortexWorkerConf;
 import org.apache.reef.wake.EventHandler;
 
@@ -53,15 +54,22 @@ public final class VortexWorker implements Task, TaskMessageSource {
 
   private final BlockingDeque<byte[]> pendingRequests = new LinkedBlockingDeque<>();
   private final BlockingDeque<byte[]> workerReports = new LinkedBlockingDeque<>();
+  private final ConcurrentMap<Integer, AggregateContainer> aggregates = new ConcurrentHashMap<>();
 
+  private final AggregateFunctionRepository aggregateFunctionRepository;
+  private final VortexAvroUtils vortexAvroUtils;
   private final HeartBeatTriggerManager heartBeatTriggerManager;
   private final int numOfThreads;
   private final CountDownLatch terminated = new CountDownLatch(1);
 
   @Inject
   private VortexWorker(final HeartBeatTriggerManager heartBeatTriggerManager,
+                       final AggregateFunctionRepository aggregateFunctionRepository,
+                       final VortexAvroUtils vortexAvroUtils,
                        @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) {
     this.heartBeatTriggerManager = heartBeatTriggerManager;
+    this.aggregateFunctionRepository = aggregateFunctionRepository;
+    this.vortexAvroUtils = vortexAvroUtils;
     this.numOfThreads = numOfThreads;
   }
 
@@ -76,6 +84,7 @@ public final class VortexWorker implements Task, TaskMessageSource {
 
     // Scheduling thread starts
     schedulerThread.execute(new Runnable() {
+      @SuppressWarnings("InfiniteLoopStatement") // Scheduler is supposed to run forever.
       @Override
       public void run() {
         while (true) {
@@ -88,63 +97,31 @@ public final class VortexWorker implements Task, TaskMessageSource {
           }
 
           // Command Executor: Deserialize the command
-          final VortexRequest vortexRequest = VortexAvroUtils.toVortexRequest(message);
+          final VortexRequest vortexRequest = vortexAvroUtils.toVortexRequest(message);
 
           switch (vortexRequest.getType()) {
+            case AggregateTasklets:
+              final TaskletAggregationRequest taskletAggregationRequest = (TaskletAggregationRequest) vortexRequest;
+              aggregates.put(taskletAggregationRequest.getAggregateFunctionId(),
+                  new AggregateContainer(taskletAggregationRequest));
+
+              // VortexFunctions need to be put into the repository such that VortexAvroUtils will know how to
+              // convert inputs and functions into a VortexRequest on subsequent messages requesting to
+              // execute the aggregateable tasklets.
+              aggregateFunctionRepository.put(taskletAggregationRequest.getAggregateFunctionId(),
+                  taskletAggregationRequest.getAggregateFunction(), taskletAggregationRequest.getFunction());
+
+              break;
+            case ExecuteAggregateTasklet:
+              executeAggregateTasklet(commandExecutor, vortexRequest);
+              break;
             case ExecuteTasklet:
-              final CountDownLatch latch = new CountDownLatch(1);
-
-              // Scheduler Thread: Pass the command to the worker thread pool to be executed
-              // Record future to support cancellation.
-              futures.put(
-                  vortexRequest.getTaskletId(),
-                  commandExecutor.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                      final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
-                      final WorkerReport workerReport;
-                      final List<TaskletReport> taskletReports = new ArrayList<>();
-
-                      try {
-                        // Command Executor: Execute the command
-                        final TaskletReport taskletReport =
-                            new TaskletResultReport(taskletExecutionRequest.getTaskletId(),
-                                taskletExecutionRequest.execute());
-                        taskletReports.add(taskletReport);
-                      } catch (final InterruptedException ex) {
-                        // Assumes that user's thread follows convention that cancelled Futures
-                        // should throw InterruptedException.
-                        final TaskletReport taskletReport =
-                            new TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
-                        LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", vortexRequest.getTaskletId());
-                        taskletReports.add(taskletReport);
-                      } catch (Exception e) {
-                        // Command Executor: Tasklet throws an exception
-                        final TaskletReport taskletReport =
-                            new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
-                        taskletReports.add(taskletReport);
-                      }
-
-                      workerReport = new WorkerReport(taskletReports);
-                      workerReports.addLast(VortexAvroUtils.toBytes(workerReport));
-                      try {
-                        latch.await();
-                      } catch (final InterruptedException e) {
-                        LOG.log(Level.SEVERE, "Cannot wait for Future to be put.");
-                        throw new RuntimeException(e);
-                      }
-
-                      futures.remove(vortexRequest.getTaskletId());
-                      heartBeatTriggerManager.triggerHeartBeat();
-                    }
-                  }));
-
-              // Signal that future is put.
-              latch.countDown();
+              executeTasklet(commandExecutor, futures, vortexRequest);
               break;
             case CancelTasklet:
-              LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", vortexRequest.getTaskletId());
-              final Future future = futures.get(vortexRequest.getTaskletId());
+              final TaskletCancellationRequest cancellationRequest = (TaskletCancellationRequest) vortexRequest;
+              LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", cancellationRequest.getTaskletId());
+              final Future future = futures.get(cancellationRequest.getTaskletId());
               if (future != null) {
                 future.cancel(true);
               }
@@ -161,6 +138,100 @@ public final class VortexWorker implements Task, TaskMessageSource {
   }
 
   /**
+   * Executes an tasklet request from the {@link org.apache.reef.vortex.driver.VortexDriver}.
+   */
+  private void executeTasklet(final ExecutorService commandExecutor,
+                              final ConcurrentMap<Integer, Future> futures,
+                              final VortexRequest vortexRequest) {
+    final CountDownLatch latch = new CountDownLatch(1);
+    final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
+
+    // Scheduler Thread: Pass the command to the worker thread pool to be executed
+    // Record future to support cancellation.
+    futures.put(
+        taskletExecutionRequest.getTaskletId(),
+        commandExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            final WorkerReport workerReport;
+            final List<TaskletReport> taskletReports = new ArrayList<>();
+
+            try {
+              // Command Executor: Execute the command
+              final TaskletReport taskletReport =
+                  new TaskletResultReport(taskletExecutionRequest.getTaskletId(),
+                      taskletExecutionRequest.execute());
+              taskletReports.add(taskletReport);
+            } catch (final InterruptedException ex) {
+              // Assumes that user's thread follows convention that cancelled Futures
+              // should throw InterruptedException.
+              final TaskletReport taskletReport =
+                  new TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
+              LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled",
+                  taskletExecutionRequest.getTaskletId());
+              taskletReports.add(taskletReport);
+            } catch (Exception e) {
+              // Command Executor: Tasklet throws an exception
+              final TaskletReport taskletReport =
+                  new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
+              taskletReports.add(taskletReport);
+            }
+
+            workerReport = new WorkerReport(taskletReports);
+            workerReports.addLast(vortexAvroUtils.toBytes(workerReport));
+            try {
+              latch.await();
+            } catch (final InterruptedException e) {
+              LOG.log(Level.SEVERE, "Cannot wait for Future to be put.");
+              throw new RuntimeException(e);
+            }
+
+            futures.remove(taskletExecutionRequest.getTaskletId());
+            heartBeatTriggerManager.triggerHeartBeat();
+          }
+        }));
+
+    // Signal that future is put.
+    latch.countDown();
+  }
+
+  /**
+   * Executes an aggregation request from the {@link org.apache.reef.vortex.driver.VortexDriver}.
+   */
+  private void executeAggregateTasklet(final ExecutorService commandExecutor,
+                                       final VortexRequest vortexRequest) {
+    final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest =
+        (TaskletAggregateExecutionRequest) vortexRequest;
+
+    assert aggregates.containsKey(taskletAggregateExecutionRequest.getAggregateFunctionId());
+
+    final AggregateContainer aggregateContainer = aggregates.get(
+        taskletAggregateExecutionRequest.getAggregateFunctionId());
+    final TaskletAggregationRequest aggregationRequest = aggregateContainer.getTaskletAggregationRequest();
+
+    commandExecutor.submit(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          final Object result = aggregationRequest.executeFunction(taskletAggregateExecutionRequest.getInput());
+          aggregateContainer.taskletComplete(taskletAggregateExecutionRequest.getTaskletId(), result);
+        } catch (final Exception e) {
+          aggregateContainer.taskletFailed(taskletAggregateExecutionRequest.getTaskletId(), e);
+        }
+
+        // TODO[JIRA REEF-1131]: Call according to aggregate policies.
+        final Optional<WorkerReport> workerReport = aggregateContainer.aggregateTasklets();
+
+        // Add to worker report only if there is something to report back.
+        if (workerReport.isPresent()) {
+          workerReports.addLast(vortexAvroUtils.toBytes(workerReport.get()));
+          heartBeatTriggerManager.triggerHeartBeat();
+        }
+      }
+    });
+  }
+
+  /**
    * @return the workerReport the worker wishes to send.
    */
   @Override

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java
new file mode 100644
index 0000000..391de6e
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.examples.sumones;
+
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.serialization.SerializableCodec;
+import org.apache.reef.vortex.api.VortexAggregateException;
+import org.apache.reef.vortex.api.VortexAggregateFunction;
+
+import java.util.List;
+
+/**
+ * Aggregates and sums the outputs.
+ */
+public final class AdditionAggregateFunction implements VortexAggregateFunction<Integer> {
+  private static final Codec<Integer> CODEC = new SerializableCodec<>();
+
+  @Override
+  public Integer call(final List<Integer> taskletOutputs) throws VortexAggregateException {
+    int sum = 0;
+    for (final int output : taskletOutputs) {
+      sum += output;
+    }
+
+    return sum;
+  }
+
+  @Override
+  public Codec<Integer> getOutputCodec() {
+    return CODEC;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java
new file mode 100644
index 0000000..b7a69ef
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.examples.sumones;
+
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.serialization.SerializableCodec;
+import org.apache.reef.vortex.api.VortexFunction;
+
+/**
+ * Identity function.
+ */
+public final class IdentityFunction implements VortexFunction<Integer, Integer> {
+  private static final Codec<Integer> CODEC = new SerializableCodec<>();
+
+  /**
+   * Outputs input.
+   */
+  @Override
+  public Integer call(final Integer input) throws Exception {
+    return input;
+  }
+
+  @Override
+  public Codec<Integer> getInputCodec() {
+    return CODEC;
+  }
+
+  @Override
+  public Codec<Integer> getOutputCodec() {
+    return CODEC;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnes.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnes.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnes.java
new file mode 100644
index 0000000..7ba8b10
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnes.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.examples.sumones;
+
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.vortex.driver.VortexJobConf;
+import org.apache.reef.vortex.driver.VortexLauncher;
+import org.apache.reef.vortex.driver.VortexMasterConf;
+
+/**
+ * User's main function.
+ */
+final class SumOnes {
+  private SumOnes() {
+  }
+
+  /**
+   * Launch the vortex job, passing appropriate arguments.
+   */
+  public static void main(final String[] args) {
+    final Configuration vortexMasterConf = VortexMasterConf.CONF
+        .set(VortexMasterConf.WORKER_NUM, 2)
+        .set(VortexMasterConf.WORKER_MEM, 1024)
+        .set(VortexMasterConf.WORKER_CORES, 4)
+        .set(VortexMasterConf.WORKER_CAPACITY, 2000)
+        .set(VortexMasterConf.VORTEX_START, SumOnesAggregateStart.class)
+        .build();
+
+    final Configuration userConf = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(NumberOfOnes.class, "1000")
+        .build();
+
+    final VortexJobConf vortexJobConf = VortexJobConf.newBuilder()
+        .setJobName("Vortex_Example_SumOnes")
+        .setVortexMasterConf(vortexMasterConf)
+        .setUserConf(userConf)
+        .build();
+
+    VortexLauncher.launchLocal(vortexJobConf);
+  }
+
+  @NamedParameter(doc = "numbers of ones to sum")
+  public static class NumberOfOnes implements Name<Integer> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java
new file mode 100644
index 0000000..bd09565
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.examples.sumones;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.vortex.api.*;
+
+import javax.inject.Inject;
+import java.util.Vector;
+
+/**
+ * SumOnes User Code Example.
+ */
+final class SumOnesAggregateStart implements VortexStart {
+
+  private final int numbers;
+
+  @Inject
+  private SumOnesAggregateStart(@Parameter(SumOnes.NumberOfOnes.class) final int numbers) {
+    this.numbers = numbers;
+  }
+
+  /**
+   * Perform a simple sum and aggregation of ones on Vortex.
+   */
+  @Override
+  public void start(final VortexThreadPool vortexThreadPool) {
+    final Vector<Integer> inputVector = new Vector<>();
+    for (int i = 0; i < numbers; i++) {
+      inputVector.add(1);
+    }
+
+    final VortexAggregateFuture<Integer, Integer> future =
+        vortexThreadPool.submit(new AdditionAggregateFunction(), new IdentityFunction(), inputVector);
+
+    try {
+      AggregateResultSynchronous<Integer, Integer> result;
+      result = future.get();
+      int allSum = 0;
+      while (result.hasNext()) {
+        result = future.get();
+        final int sumResult;
+
+        try {
+          sumResult = result.getAggregateResult();
+        } catch (final VortexAggregateException e) {
+          throw new RuntimeException(e);
+        }
+
+        int sumInputs = 0;
+        for (int i : result.getAggregatedInputs()) {
+          sumInputs += i;
+        }
+
+        assert sumResult == sumInputs;
+
+        allSum += sumResult;
+      }
+
+      assert allSum == numbers;
+
+    } catch (final InterruptedException ie) {
+      throw new RuntimeException(ie);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/package-info.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/package-info.java
new file mode 100644
index 0000000..001e5e9
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * A Simple Vortex addition and aggregation example.
+ */
+package org.apache.reef.vortex.examples.sumones;
\ No newline at end of file