You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:45 UTC

[44/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/ProcessSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/ProcessSource.java b/api/oplet/src/main/java/edgent/oplet/core/ProcessSource.java
deleted file mode 100644
index ca26a09..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/ProcessSource.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.concurrent.ThreadFactory;
-
-public abstract class ProcessSource<T> extends Source<T>implements Runnable {
-
-    @Override
-    public void start() {
-        Thread t = getOpletContext().getService(ThreadFactory.class).newThread(this);
-        t.setDaemon(false);
-        t.start();
-    }
-
-    protected Runnable getRunnable() {
-        return this;
-    }
-
-    protected abstract void process() throws Exception;
-
-    @Override
-    public void run() {
-        try {
-            process();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Sink.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Sink.java b/api/oplet/src/main/java/edgent/oplet/core/Sink.java
deleted file mode 100644
index f8f087d..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Sink.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import static edgent.function.Functions.closeFunction;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-import edgent.function.Functions;
-
-/**
- * Sink a stream by processing each tuple through
- * a {@link Consumer}.
- * If the {@code sinker} function implements {@code AutoCloseable}
- * then when this oplet is closed {@code sinker.close()} is called.
- *
- * @param <T> Tuple type.
- */
-public class Sink<T> extends AbstractOplet<T, Void> {
-    
-    private Consumer<T> sinker;
-
-    /**
-     * Create a  {@code Sink} that discards all tuples.
-     * The sink function can be changed using
-     * {@link #setSinker(Consumer)}.
-     */
-    public Sink() {
-        setSinker(Functions.discard());
-    }
-    
-    /**
-     * Create a {@code Sink} oplet.
-     * @param sinker Processing to be performed on each tuple.
-     */
-    public Sink(Consumer<T> sinker) {
-        setSinker(sinker);
-    }
-
-    @Override
-    public List<Consumer<T>> getInputs() {
-        return Collections.singletonList(getSinker());
-    }
-
-    @Override
-    public void start() {
-    }
-    
-    @Override
-    public void close() throws Exception {
-        closeFunction(getSinker());
-    }
-    
-    /**
-     * Set the sink function.
-     * @param sinker Processing to be performed on each tuple.
-     */
-    protected void setSinker(Consumer<T> sinker) {
-        this.sinker = sinker;
-    }
-    
-    /**
-     * Get the sink function that processes each tuple.
-     * @return function that processes each tuple.
-     */
-    protected Consumer<T> getSinker() {
-        return sinker;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Source.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Source.java b/api/oplet/src/main/java/edgent/oplet/core/Source.java
deleted file mode 100644
index b7c1dcc..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Source.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-import edgent.oplet.OpletContext;
-
-public abstract class Source<T> extends AbstractOplet<Void, T> {
-
-    private Consumer<T> destination;
-
-    @Override
-    public void initialize(OpletContext<Void, T> context) {
-        super.initialize(context);
-
-        destination = context.getOutputs().get(0);
-    }
-
-    protected Consumer<T> getDestination() {
-        return destination;
-    }
-    
-    /**
-     * Submit a tuple to single output.
-     * @param tuple Tuple to be submitted.
-     */
-    protected void submit(T tuple) {
-        getDestination().accept(tuple);
-    }
-
-    @Override
-    public final List<Consumer<Void>> getInputs() {
-        return Collections.emptyList();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Split.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Split.java b/api/oplet/src/main/java/edgent/oplet/core/Split.java
deleted file mode 100644
index 0480f27..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Split.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import static edgent.function.Functions.closeFunction;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-import edgent.function.ToIntFunction;
-import edgent.oplet.OpletContext;
-
-/**
- * Split a stream into multiple streams depending
- * on the result of a splitter function.
- * <BR>
- * For each tuple a function is called:
- * <UL>
- * <LI>If the return is negative the tuple is dropped.</LI>
- * <LI>Otherwise the return value is modded by the number of
- * output ports and the result is the output port index
- * the tuple is submitted to.</LI>
- * </UL>
- *
- * @param <T> Type of the tuple.
- */
-public class Split<T> extends AbstractOplet<T, T> implements Consumer<T> {
-
-    private static final long serialVersionUID = 1L;
-    private final ToIntFunction<T> splitter;
-    private List<? extends Consumer<T>> destinations;
-    private int n;
-    
-    public Split(ToIntFunction<T> splitter) {
-        this.splitter = splitter;
-    }
-    
-
-    @Override
-    public void initialize(OpletContext<T, T> context) {
-        super.initialize(context);
-
-        destinations = context.getOutputs();
-        n = destinations.size();
-    }
-
-    @Override
-    public void start() {
-    }
-
-    @Override
-    public List<Consumer<T>> getInputs() {
-        return Collections.singletonList(this);
-    }
-
-    @Override
-    public void accept(T tuple) {
-        int s = splitter.applyAsInt(tuple);
-        if (s >= 0)
-            destinations.get(s % n).accept(tuple);
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeFunction(splitter);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Union.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Union.java b/api/oplet/src/main/java/edgent/oplet/core/Union.java
deleted file mode 100644
index 0237246..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Union.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import edgent.oplet.OpletContext;
-
-/**
- * Union oplet, merges multiple input ports
- * into a single output port.
- * 
- * Processing for each input is identical
- * and just submits the tuple to the single output.
- * 
- * @param <T> Tuple type
- */
-public final class Union<T> extends FanIn<T, T> {
-
-    @Override
-    public void start() {
-    }
-
-    
-    @Override
-    public void initialize(OpletContext<T, T> context) {
-      super.initialize(context);
-      
-      // forward every tuple to the output port
-      setReceiver((tuple, iportIndex) -> tuple);
-    }
-
-    @Override
-    public void close() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/mbeans/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/mbeans/package-info.java b/api/oplet/src/main/java/edgent/oplet/core/mbeans/package-info.java
deleted file mode 100644
index 5ea10b6..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/mbeans/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Management beans for core oplets.
- */
-package edgent.oplet.core.mbeans;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/package-info.java b/api/oplet/src/main/java/edgent/oplet/core/package-info.java
deleted file mode 100644
index b208400..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Core primitive oplets.
- */
-package edgent.oplet.core;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/Events.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/Events.java b/api/oplet/src/main/java/edgent/oplet/functional/Events.java
deleted file mode 100644
index d24e4a0..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/Events.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-
-import static edgent.function.Functions.closeFunction;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-
-import edgent.function.Consumer;
-import edgent.oplet.core.Source;
-
-/**
- * Generate tuples from events.
- * This oplet implements {@link Consumer} which
- * can be called directly from an event handler,
- * listener or callback. 
- * 
- * @param <T> Data container type for output tuples.
- */
-public class Events<T> extends Source<T>implements Consumer<T> {
-
-    private static final long serialVersionUID = 1L;
-    private Consumer<Consumer<T>> eventSetup;
-
-    public Events(Consumer<Consumer<T>> eventSetup) {
-        this.eventSetup = eventSetup;
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeFunction(eventSetup);
-    }
-
-    @Override
-    public void start() {
-        // Allocate a thread so the job containing this oplet
-        // doesn't look "complete" and shutdown.
-        Thread endlessEventSource = getOpletContext()
-                .getService(ThreadFactory.class)
-                .newThread(() -> {
-                    try {
-                        Thread.sleep(Long.MAX_VALUE);
-                    }
-                    catch (InterruptedException e) {
-                        // cancelled; we're done.
-                    }
-                });
-        endlessEventSource.setDaemon(false);
-        endlessEventSource.start();
-
-        // It's possible for uses to do things like a blocking connect
-        // to an external system from eventSetup.accept() so run it as
-        // a task to avoid start() / submit-job timeouts.
-        getOpletContext().getService(ScheduledExecutorService.class)
-                    .submit(() -> eventSetup.accept(this));
-    }
-
-    @Override
-    public void accept(T tuple) {
-        submit(tuple);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/Filter.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/Filter.java b/api/oplet/src/main/java/edgent/oplet/functional/Filter.java
deleted file mode 100644
index 4ce1590..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/Filter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-import static edgent.function.Functions.closeFunction;
-
-import edgent.function.Predicate;
-import edgent.oplet.core.Pipe;
-
-public class Filter<T> extends Pipe<T, T> {
-    private static final long serialVersionUID = 1L;
-    private Predicate<T> filter;
-
-    public Filter(Predicate<T> filter) {
-        this.filter = filter;
-    }
-
-    @Override
-    public void accept(T tuple) {
-        if (filter.test(tuple))
-            submit(tuple);
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeFunction(filter);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/FlatMap.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/FlatMap.java b/api/oplet/src/main/java/edgent/oplet/functional/FlatMap.java
deleted file mode 100644
index f1cc82b..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/FlatMap.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-import static edgent.function.Functions.closeFunction;
-
-import edgent.function.Function;
-import edgent.oplet.core.Pipe;
-
-/**
- * 
- * Map an input tuple to 0-N output tuples.
- * 
- * Uses a function that returns an iterable
- * to map the input tuple. The return value
- * of the function's apply method is
- * iterated through with each returned
- * value being submitted as an output tuple.
- * 
- * 
- * @param <I>
- *            Data container type for input tuples.
- * @param <O>
- *            Data container type for output tuples.
- */
-public class FlatMap<I, O> extends Pipe<I, O> {
-	private static final long serialVersionUID = 1L;
-	
-	private Function<I, Iterable<O>> function;
-
-    public FlatMap(Function<I, Iterable<O>> function) {
-        this.function = function;
-    }
-
-    @Override
-    public void accept(I tuple) {
-        Iterable<O> outputs = function.apply(tuple);
-        if (outputs != null) {
-        	for (O output : outputs) {
-        		if (output != null)
-                    submit(output);
-        	}
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeFunction(function);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/Map.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/Map.java b/api/oplet/src/main/java/edgent/oplet/functional/Map.java
deleted file mode 100644
index 2d80c61..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/Map.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-import static edgent.function.Functions.closeFunction;
-
-import edgent.function.Function;
-import edgent.oplet.core.Pipe;
-
-/**
- * Map an input tuple to 0-1 output tuple
- * 
- *
- * @param <I>
- *            Data container type for input tuples.
- * @param <O>
- *            Data container type for output tuples.
- */
-public class Map<I, O> extends Pipe<I, O> {
-    private static final long serialVersionUID = 1L;
-    private Function<I, O> function;
-
-    public Map(Function<I, O> function) {
-        this.function = function;
-    }
-
-    @Override
-    public void accept(I tuple) {
-        O output = function.apply(tuple);
-        if (output != null)
-            submit(output);
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeFunction(function);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/Peek.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/Peek.java b/api/oplet/src/main/java/edgent/oplet/functional/Peek.java
deleted file mode 100644
index 992b945..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/Peek.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-
-import static edgent.function.Functions.closeFunction;
-
-import edgent.function.Consumer;
-
-/**
- * Functional peek oplet.
- * 
- * Each peek calls {@code peeker.accept(tuple)}.
- *
- * @param <T> Tuple type.
- */
-public class Peek<T> extends edgent.oplet.core.Peek<T> {
-    private static final long serialVersionUID = 1L;
-    private final Consumer<T> peeker;
-
-    /**
-     * Peek oplet using a function to peek.
-     * @param peeker Function that peeks at the tuple.
-     */
-    public Peek(Consumer<T> peeker) {
-        this.peeker = peeker;
-    }
-    
-    protected Consumer<T> getPeeker() {
-      return peeker;
-    }
-
-    @Override
-    protected void peek(T tuple) {
-        peeker.accept(tuple);
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeFunction(peeker);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/SupplierPeriodicSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/SupplierPeriodicSource.java b/api/oplet/src/main/java/edgent/oplet/functional/SupplierPeriodicSource.java
deleted file mode 100644
index 42e35cd..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/SupplierPeriodicSource.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-import static edgent.function.Functions.closeFunction;
-
-import java.util.concurrent.TimeUnit;
-
-import edgent.function.Supplier;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.PeriodicSource;
-
-public class SupplierPeriodicSource<T> extends PeriodicSource<T> {
-
-    private Supplier<T> data;
-
-    public SupplierPeriodicSource(long period, TimeUnit unit, Supplier<T> data) {
-        super(period, unit);
-        this.data = data;
-    }
-
-    @Override
-    public void initialize(OpletContext<Void, T> context) {
-        super.initialize(context);
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeFunction(data);
-    }
-
-    @Override
-    public void fetchTuples() {
-        T tuple = data.get();
-        if (tuple != null)
-            submit(tuple);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/SupplierSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/SupplierSource.java b/api/oplet/src/main/java/edgent/oplet/functional/SupplierSource.java
deleted file mode 100644
index 6bf63ee..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/SupplierSource.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-import static edgent.function.Functions.closeFunction;
-
-import edgent.function.Supplier;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.ProcessSource;
-
-public class SupplierSource<T> extends ProcessSource<T> {
-
-    private Supplier<Iterable<T>> data;
-
-    public SupplierSource() {
-    }
-
-    public SupplierSource(Supplier<Iterable<T>> data) {
-        this.data = data;
-    }
-
-    @Override
-    public void initialize(OpletContext<Void, T> context) {
-        super.initialize(context);
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeFunction(data);
-    }
-
-    @Override
-    public void process() {
-        for (T tuple : data.get()) {
-            if (tuple != null)
-                submit(tuple);
-
-            if (Thread.currentThread().isInterrupted())
-                break;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/package-info.java b/api/oplet/src/main/java/edgent/oplet/functional/package-info.java
deleted file mode 100644
index b00ca88..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Oplets that process tuples using functions.
- */
-package edgent.oplet.functional;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/package-info.java b/api/oplet/src/main/java/edgent/oplet/package-info.java
deleted file mode 100644
index 05b4013..0000000
--- a/api/oplet/src/main/java/edgent/oplet/package-info.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Oplets API.
- * <P>
- * An oplet is a stream processor that can have 0-N input ports and 0-M output ports.
- * Tuples on streams connected to an oplet's input port are delivered to the oplet for processing.
- * The oplet submits tuples to its output ports which results in the tuples
- * being present on the connected streams.
- * </P>
- */
-package edgent.oplet;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/plumbing/Barrier.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/plumbing/Barrier.java b/api/oplet/src/main/java/edgent/oplet/plumbing/Barrier.java
deleted file mode 100644
index 2aff466..0000000
--- a/api/oplet/src/main/java/edgent/oplet/plumbing/Barrier.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.plumbing;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-
-import edgent.function.BiFunction;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.FanIn;
-
-/**
- * A tuple synchronization barrier.
- * <P>
- * {@code Barrier} has n input ports with tuple type {@code T}
- * and one output port with tuple type {@code List<T>}.
- * Once the oplet receives one tuple on each of its input ports,
- * it generates an output tuple containing one tuple from each input port.
- * It then awaits receiving the next collection of tuples.
- * Input port 0's tuple is in list[0], port 1's tuple in list[1], and so on.
- * </P><P>
- * Each input port has an associated queue of size {@code queueCapacity}.
- * The input port's {@code Consumer<T>.accept()} will block if it's queue is full. 
- * </P>
- *
- * @param <T> Type of the tuple.
- */
-public class Barrier<T> extends FanIn<T, List<T>> {
-    
-    private final int queueCapacity;
-    private Thread thread;
-    private List<LinkedBlockingQueue<T>> iportQueues;
-    
-    /**
-     * Create a new instance.
-     * @param queueCapacity size of each input port's blocking queue
-     */
-    public Barrier(int queueCapacity) {
-      this.queueCapacity = queueCapacity;
-    }
-    
-    @Override
-    public void initialize(OpletContext<T, List<T>> context) {
-        super.initialize(context);
-        
-        thread = context.getService(ThreadFactory.class).newThread(() -> run());
-        
-        int numIports = getOpletContext().getInputCount();
-        iportQueues = new ArrayList<>(numIports);
-        for (int i = 0; i < numIports; i++)
-          iportQueues.add(new LinkedBlockingQueue<>(queueCapacity));
-        
-        setReceiver(receiver());
-    }
-   
-    @Override
-    public void start() {
-        thread.start();
-    }
-    
-    protected BiFunction<T,Integer,List<T>> receiver() {
-      return (tuple, iportIndex) -> {
-        accept(tuple, iportIndex);
-        return null;
-      };
-    }
-    
-    protected void accept(T tuple, int iportIndex) {
-      try {
-        iportQueues.get(iportIndex).put(tuple);
-      } catch(InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
-      }
-    }
-
-    private void run() {
-        while (!Thread.interrupted()) {
-            try {
-              List<T> list = new ArrayList<>(iportQueues.size());
-              for (LinkedBlockingQueue<T> iport : iportQueues) {
-                list.add(iport.take());
-              }
-              submit(list);
-            } catch (InterruptedException e) {
-                break;
-            }
-        }
-    }
-    
-    @Override
-    public void close() {
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/plumbing/Isolate.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/plumbing/Isolate.java b/api/oplet/src/main/java/edgent/oplet/plumbing/Isolate.java
deleted file mode 100644
index 8824076..0000000
--- a/api/oplet/src/main/java/edgent/oplet/plumbing/Isolate.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.plumbing;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.Pipe;
-
-/**
- * Isolate upstream processing from downstream
- * processing guaranteeing tuple order.
- * Input tuples are placed at the tail of a queue
- * and dedicated thread removes them from the
- * head and is used for downstream processing.
- *
- * @param <T> Type of the tuple.
- */
-public class Isolate<T> extends Pipe<T,T> {
-    private static final long serialVersionUID = 1L;
-    
-    private Thread thread;
-    private final LinkedBlockingQueue<T> tuples;
-    
-    /**
-     * Create a new Isolate oplet.
-     * <BR>
-     * Same as Isolate(Integer.MAX_VALUE).
-     */
-    public Isolate() {
-      this(Integer.MAX_VALUE);
-    }
-    
-    /**
-     * Create a new Isolate oplet.
-     * @param queueCapacity size of the queue between the input stream
-     *          and the output stream.
-     *          {@link #accept(Object) accept} blocks when the queue is full.
-     */
-    public Isolate(int queueCapacity) {
-      tuples = new LinkedBlockingQueue<>(queueCapacity);
-    }
-    
-    @Override
-    public void initialize(OpletContext<T, T> context) {
-        super.initialize(context);
-        thread = context.getService(ThreadFactory.class).newThread(() -> run());
-    }
-   
-    @Override
-    public void start() {
-        super.start();
-        thread.start();
-    }
-
-    @Override
-    public void accept(T tuple) {
-        try {
-            tuples.put(tuple);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-        }      
-    }
-
-    private void run() {
-        while (!Thread.interrupted()) {
-            try {
-                submit(tuples.take());
-            } catch (InterruptedException e) {
-                break;
-            }
-        }
-    }
-    
-    @Override
-    public void close() throws Exception {
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/plumbing/PressureReliever.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/plumbing/PressureReliever.java b/api/oplet/src/main/java/edgent/oplet/plumbing/PressureReliever.java
deleted file mode 100644
index 1b295c9..0000000
--- a/api/oplet/src/main/java/edgent/oplet/plumbing/PressureReliever.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.plumbing;
-
-import java.util.LinkedList;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import edgent.function.BiConsumer;
-import edgent.function.Function;
-import edgent.function.Functions;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.Pipe;
-import edgent.window.Partition;
-import edgent.window.PartitionedState;
-import edgent.window.Policies;
-import edgent.window.Window;
-import edgent.window.Windows;
-
-/**
- * Relieve pressure on upstream oplets by discarding tuples.
- * This oplet ensures that upstream processing is not
- * constrained by any delay in downstream processing,
- * for example by a sink oplet not being able to connect
- * to its external system.
- * When downstream processing cannot keep up with the input rate
- * this oplet maintains a defined window of the most recent
- * tuples and discards any earlier tuples using arrival order.
- * <P>
- * A window partition is maintained for each key seen
- * on the input stream. Any tuple arriving on the input
- * stream is inserted into the window. Asynchronously
- * tuples are taken from the window using FIFO and
- * submitted downstream. The submission of tuples maintains
- * order within a partition but not across partitions.
- * </P>
- * <P>
- * Tuples are  <B>discarded and not</B> submitted to the
- * output port if the downstream processing cannot keep up
- * the incoming tuple rate.
- * </P>
- * <UL>
- * <LI>For a {@link #PressureReliever(int, Function) count}
- * {@code PressureReliever} up to last (most recent) {@code N} tuples
- * are maintained in a window partition.
- * <BR> Asynchronous tuple submission removes the last (oldest) tuple in the partition
- * before submitting it downstream.
- * <BR> If when an input tuple is processed the window partition contains N tuples, then
- * the first (oldest) tuple in the partition is discarded before the input tuple is inserted into the window.
- * </UL>
- * <P>
- * Insertion of the oplet into a stream disconnects the
- * upstream processing from the downstream processing,
- * so that downstream processing is executed on a different
- * thread to the thread that processed the input tuple.
- * </P>
- * 
- * @param <T> Tuple type.
- * @param <K> Key type.
- */
-public class PressureReliever<T, K> extends Pipe<T, T> {
-    private static final long serialVersionUID = 1L;
-
-    private ScheduledExecutorService executor;
-    private final Window<T, K, LinkedList<T>> window;
-
-    /**
-     * Pressure reliever that maintains up to {@code count} most recent tuples per key.
-     *
-     * @param count Number of tuples to maintain where downstream processing cannot keep up.
-     * @param keyFunction Key function for tuples.
-     */
-    public PressureReliever(int count, Function<T, K> keyFunction) {
-        window = Windows.window(
-                Policies.alwaysInsert(),
-                Policies.countContentsPolicy(count),
-                Policies.evictOldest(),
-                new FirstSubmitter(),
-                keyFunction,
-                () -> new LinkedList<T>());
-
-        // No processing of the window takes place
-        window.registerPartitionProcessor((tuples, k) -> { });
-    }    
-
-    @Override
-    public void initialize(OpletContext<T, T> context) {
-        super.initialize(context);
-        executor = context.getService(ScheduledExecutorService.class);
-    }
-
-    @Override
-    public void accept(T tuple) {
-        window.insert(tuple);
-    }
-
-    @Override
-    public void close() throws Exception {
-    }
-
-    private class FirstSubmitter extends PartitionedState<K, AtomicBoolean>
-            implements BiConsumer<Partition<T, K, LinkedList<T>>, T> {
-
-        /**
-         * 
-         */
-        private static final long serialVersionUID = 1L;
-
-        FirstSubmitter() {
-            super(() -> new AtomicBoolean());
-        }
-
-        /**
-         * Process the window (to consume the oldest tuple in the partition)
-         * only if a tuple from this partition is not already being consumed.
-         * 
-         * @param t
-         * @param v
-         */
-        @Override
-        public void accept(Partition<T, K, LinkedList<T>> partition, T tuple) {
-            submitNextTuple(partition);
-        }
-
-        private void submitNextTuple(Partition<T, K, LinkedList<T>> partition) {
-            final K key = partition.getKey();
-            final AtomicBoolean latch = getState(key);
-            if (!latch.compareAndSet(false, true))
-                return;
-            
-            final T firstTuple;
-            synchronized (partition) {
-                final LinkedList<T> contents = partition.getContents();
-                if (contents.isEmpty()) {
-                    latch.set(false);
-                    return;
-                }
-
-                firstTuple = contents.removeFirst();
-            }
-
-            Runnable submit = Functions.delayedConsume(getDestination(), firstTuple);
-            submit = Functions.runWithFinal(submit, () -> {
-                latch.set(false);
-                submitNextTuple(partition);
-            });
-
-            executor.execute(submit);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/plumbing/UnorderedIsolate.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/plumbing/UnorderedIsolate.java b/api/oplet/src/main/java/edgent/oplet/plumbing/UnorderedIsolate.java
deleted file mode 100644
index 9fd42b0..0000000
--- a/api/oplet/src/main/java/edgent/oplet/plumbing/UnorderedIsolate.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.plumbing;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import edgent.function.Functions;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.Pipe;
-
-/**
- * Isolate upstream processing from downstream
- * processing without guaranteeing tuple order.
- * An executor is used for downstream processing
- * thus tuple order cannot be guaranteed as the
- * scheduler does not guarantee execution order.
- *
- * @param <T> Type of the tuple.
- */
-public class UnorderedIsolate<T> extends Pipe<T,T> {
-    private static final long serialVersionUID = 1L;
-    
-    private ScheduledExecutorService executor;
-    
-    @Override
-    public void initialize(OpletContext<T, T> context) {
-        super.initialize(context);
-        executor = context.getService(ScheduledExecutorService.class);
-    }
-
-    @Override
-    public void accept(T tuple) {
-        executor.execute(Functions.delayedConsume(getDestination(), tuple));      
-    }
-    
-    @Override
-    public void close() throws Exception {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/plumbing/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/plumbing/package-info.java b/api/oplet/src/main/java/edgent/oplet/plumbing/package-info.java
deleted file mode 100644
index c844823..0000000
--- a/api/oplet/src/main/java/edgent/oplet/plumbing/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Oplets that control the flow of tuples.
- */
-package edgent.oplet.plumbing;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/window/Aggregate.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/window/Aggregate.java b/api/oplet/src/main/java/edgent/oplet/window/Aggregate.java
deleted file mode 100644
index 4afa83d..0000000
--- a/api/oplet/src/main/java/edgent/oplet/window/Aggregate.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.window;
-
-import static edgent.function.Functions.closeFunction;
-
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.Pipe;
-import edgent.window.Window;
-
-/**
- * Aggregate a window.
- * Window contents are aggregated by a
- * {@link BiFunction aggregator function}
- * passing the list of tuples in the window and
- * the partition key. The returned value
- * is submitted to the sole output port
- * if it is not {@code null}. 
- *
- * @param <T> Type of the input tuples.
- * @param <U> Type of the output tuples.
- * @param <K> Type of the partition key.
- */
-public class Aggregate<T,U,K> extends Pipe<T, U> {
-    private static final long serialVersionUID = 1L;
-    private final Window<T,K, ? extends List<T>> window;
-    /**
-     * The aggregator provided by the user.
-     */
-    private final BiFunction<List<T>,K, U> aggregator;
-    
-    public Aggregate(Window<T,K, ? extends List<T>> window, BiFunction<List<T>,K, U> aggregator){
-        this.aggregator = aggregator;
-        BiConsumer<List<T>, K> partProcessor = (tuples, key) -> {
-            U aggregateTuple = aggregator.apply(tuples, key);
-            if (aggregateTuple != null)
-                submit(aggregateTuple);
-            };
-            
-        window.registerPartitionProcessor(partProcessor);
-        this.window=window;
-    }
-    
-    @Override
-    public void initialize(OpletContext<T,U> context) {
-        super.initialize(context);
-        window.registerScheduledExecutorService(this.getOpletContext().getService(ScheduledExecutorService.class));
-    }
-    
-    @Override
-    public void accept(T tuple) {
-        window.insert(tuple);   
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeFunction(aggregator);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/window/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/window/package-info.java b/api/oplet/src/main/java/edgent/oplet/window/package-info.java
deleted file mode 100644
index 3f740ac..0000000
--- a/api/oplet/src/main/java/edgent/oplet/window/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Oplets using windows.
- */
-package edgent.oplet.window;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/JobContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/JobContext.java b/api/oplet/src/main/java/org/apache/edgent/oplet/JobContext.java
new file mode 100644
index 0000000..1adc9d5
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/JobContext.java
@@ -0,0 +1,36 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet;
+
+/**
+ * Information about an oplet invocation's job. 
+ */
+public interface JobContext {
+    /**
+     * Get the runtime identifier for the job containing this {@link Oplet}.
+     * @return The job identifier for the application being executed.
+     */
+    String getId();
+    
+    /**
+     * Get the name of the job containing this {@link Oplet}.
+     * @return The job name for the application being executed.
+     */
+    String getName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/Oplet.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/Oplet.java b/api/oplet/src/main/java/org/apache/edgent/oplet/Oplet.java
new file mode 100644
index 0000000..642ddc6
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/Oplet.java
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet;
+
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+
+/**
+ * Generic API for an oplet that processes streaming data on 0-N input ports
+ * and produces 0-M output streams on its output ports. An input port may be
+ * connected with any number of streams from other oplets. An output port may
+ * connected to any number of input ports on other oplets.
+ *
+ * @param <I>
+ *            Data container type for input tuples.
+ * @param <O>
+ *            Data container type for output tuples.
+ */
+public interface Oplet<I, O> extends AutoCloseable {
+
+    /**
+     * Initialize the oplet.
+     * 
+     * @param context the OpletContext
+     * @throws Exception on failure
+     */
+    void initialize(OpletContext<I, O> context) throws Exception;
+
+    /**
+     * Start the oplet. Oplets must not submit any tuples not derived from
+     * input tuples until this method is called.
+     */
+    void start();
+
+    /**
+     * Get the input stream data handlers for this oplet. The number of handlers
+     * must equal the number of configured input ports. Each tuple
+     * arriving on an input port will be sent to the stream handler for that
+     * input port.
+     * 
+     * @return list of consumers
+     */
+    List<? extends Consumer<I>> getInputs();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/OpletContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/OpletContext.java b/api/oplet/src/main/java/org/apache/edgent/oplet/OpletContext.java
new file mode 100644
index 0000000..9d7052f
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/OpletContext.java
@@ -0,0 +1,103 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet;
+
+import java.util.List;
+
+import org.apache.edgent.execution.services.RuntimeServices;
+import org.apache.edgent.function.Consumer;
+
+/**
+ * Context information for the {@code Oplet}'s invocation context.
+ * <P>
+ * At execution time an oplet uses its invocation context to retrieve 
+ * provided {@link #getService(Class) services}, 
+ * {@link #getOutputs() output ports} for tuple submission
+ * and {@link #getJobContext() job} information. 
+ *
+ * @param <I> tuple type of input streams
+ * @param <O> tuple type of output streams
+ */
+public interface OpletContext<I, O> extends RuntimeServices {
+
+	/**
+	 * Get the unique identifier (within the running job)
+	 * for this oplet.
+	 * @return unique identifier for this oplet
+	 */
+	String getId();
+
+    /**
+     * {@inheritDoc}
+     * <P>
+     * Get a service for this oplet invocation.
+     * 
+     * An invocation of an oplet may get access to services,
+     * which provide specific functionality, such as metrics.
+     * </P>
+     * 
+     */
+	@Override
+    <T> T getService(Class<T> serviceClass);
+    
+    /**
+     * Get the number of connected inputs to this oplet.
+     * @return number of connected inputs to this oplet.
+     */
+    int getInputCount();
+    
+    /**
+     * Get the number of connected outputs to this oplet.
+     * @return number of connected outputs to this oplet.
+     */
+    int getOutputCount();
+
+    /**
+     * Get the mechanism to submit tuples on an output port.
+     * 
+     * @return list of consumers
+     */
+    List<? extends Consumer<O>> getOutputs();
+
+    /**
+     * Get the oplet's output port context information.
+     * @return list of {@link OutputPortContext}, one for each output port.
+     */
+    List<OutputPortContext> getOutputContext();
+
+    /**
+     * Get the job hosting this oplet. 
+     * @return {@link JobContext} hosting this oplet invocation.
+     */
+    JobContext getJobContext();
+    
+    /**
+     * Creates a unique name within the context of the current runtime.
+     * <p>
+     * The default implementation adds a suffix composed of the package 
+     * name of this interface, the current job and oplet identifiers, 
+     * all separated by periods ({@code '.'}).  Developers should use this 
+     * method to avoid name clashes when they store or register the name in 
+     * an external container or registry.
+     *
+     * @param name name (possibly non-unique)
+     * @return unique name within the context of the current runtime.
+     */
+    String uniquify(String name);
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/OutputPortContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/OutputPortContext.java b/api/oplet/src/main/java/org/apache/edgent/oplet/OutputPortContext.java
new file mode 100644
index 0000000..63fe2c8
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/OutputPortContext.java
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet;
+
+/**
+ * Information about an oplet output port. 
+ */
+public interface OutputPortContext {
+    /**
+     * Get the alias of the output port if any.
+     * @return the alias; null if none.
+     */
+    String getAlias();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/AbstractOplet.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/AbstractOplet.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/AbstractOplet.java
new file mode 100644
index 0000000..5656e8b
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/AbstractOplet.java
@@ -0,0 +1,36 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import org.apache.edgent.oplet.Oplet;
+import org.apache.edgent.oplet.OpletContext;
+
+public abstract class AbstractOplet<I, O> implements Oplet<I, O> {
+
+    private OpletContext<I, O> context;
+
+    @Override
+    public void initialize(OpletContext<I, O> context) {
+        this.context = context;
+    }
+
+    public final OpletContext<I, O> getOpletContext() {
+        return context;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanIn.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanIn.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanIn.java
new file mode 100644
index 0000000..ecdfafb
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanIn.java
@@ -0,0 +1,117 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.oplet.OpletContext;
+
+/**
+ * FanIn oplet, merges multiple input ports into a single output port.
+ * <P>
+ * For each tuple received, {@code receiver.apply(T tuple, Integer index)}
+ * is called. {@code index} is the tuple's input stream's index, where
+ * {@code this} is index 0 followed by {@code others} in their order.
+ * {@code receiver} either returns a tuple to emit on the output
+ * stream or null.
+ * </P> 
+ * 
+ * @param <T> Tuple type of input streams
+ * @param <U> Tuple type of output stream
+ */
+public class FanIn<T,U> extends AbstractOplet<T, U> {
+    private BiFunction<T, Integer, U> receiver;
+    private List<Consumer<T>> iportConsumers;
+    private Consumer<U> destination;
+    
+    public FanIn() {
+    }
+    
+    public FanIn(BiFunction<T, Integer, U> receiver) {
+      this.receiver = receiver;
+    }
+
+    @Override
+    public void initialize(OpletContext<T, U> context) {
+        super.initialize(context);
+        destination = context.getOutputs().get(0);
+       
+        // Create a consumer for each input port.
+        int numIports = getOpletContext().getInputCount();
+        if (iportConsumers == null) {
+          // each iport invokes the receiver
+          iportConsumers = new ArrayList<>(numIports);
+          for (int i = 0; i < numIports; i++)
+            iportConsumers.add(consumer(i));
+          iportConsumers = Collections.unmodifiableList(iportConsumers);
+        }
+    }
+    
+    /**
+     * Set the receiver function.  Must be called no later than as part
+     * of {@link #initialize(OpletContext)}.
+     * @param receiver function to receive tuples
+     */
+    protected void setReceiver(BiFunction<T, Integer, U> receiver) {
+      this.receiver = receiver;
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public List<? extends Consumer<T>> getInputs() {
+      return iportConsumers;
+    }
+
+    /**
+     * Create a Consumer for the input port that invokes the
+     * receiver and submits a generated tuple, if any, to the output.
+     * @param iportIndex index of the input port
+     * @return the Consumer
+     */
+    protected Consumer<T> consumer(int iportIndex) {
+      return tuple -> { 
+        U result = receiver.apply(tuple, iportIndex);
+        if (result != null)
+          submit(result);
+      };
+    }
+
+    protected Consumer<U> getDestination() {
+        return destination;
+    }
+    
+    /**
+     * Submit a tuple to single output.
+     * @param tuple Tuple to be submitted.
+     */
+    protected void submit(U tuple) {
+        getDestination().accept(tuple);
+    }
+
+    @Override
+    public void close() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanOut.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanOut.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanOut.java
new file mode 100644
index 0000000..6441cd6
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanOut.java
@@ -0,0 +1,56 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+
+public final class FanOut<T> extends AbstractOplet<T, T> implements Consumer<T> {
+    
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    
+    private List<? extends Consumer<T>> targets;
+    private int n;
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public List<? extends Consumer<T>> getInputs() {
+        targets = getOpletContext().getOutputs();
+        n = targets.size();
+        return Collections.singletonList(this);
+    }
+    
+    @Override
+    public void accept(T tuple) {
+        for (int i = 0; i < n; i++)
+            targets.get(i).accept(tuple);
+    }
+
+    @Override
+    public void close() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/Peek.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/Peek.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Peek.java
new file mode 100644
index 0000000..5b07ced
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Peek.java
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+/**
+ * Oplet that allows a peek at each tuple and always forwards a tuple onto
+ * its single output port.
+ * 
+ * {@link #peek(Object)} is called before the tuple is forwarded
+ * and it is intended that the peek be a low cost operation
+ * such as increasing a metric.
+ *
+ * @param <T>
+ *            Type of the tuple.
+ */
+public abstract class Peek<T> extends Pipe<T, T> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public final void accept(T tuple) {
+        peek(tuple);
+        submit(tuple);
+    }
+
+    protected abstract void peek(T tuple);
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/PeriodicSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/PeriodicSource.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/PeriodicSource.java
new file mode 100644
index 0000000..df7c4af
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/PeriodicSource.java
@@ -0,0 +1,113 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.execution.mbeans.PeriodMXBean;
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.oplet.OpletContext;
+import org.apache.edgent.oplet.OutputPortContext;
+
+public abstract class PeriodicSource<T> extends Source<T> implements Runnable, PeriodMXBean {
+  
+    // see comment in TStream.TYPE
+    private static final String TSTREAM_TYPE = /*TStream.TYPE*/"stream";
+
+    private long period;
+    private TimeUnit unit;
+    private ScheduledFuture<?> future;
+
+    protected PeriodicSource(long period, TimeUnit unit) {
+        this.period = period;
+        this.unit = unit;
+    }
+
+    @Override
+    public void initialize(OpletContext<Void, T> context) {
+        super.initialize(context);
+    }
+
+    @Override
+    public synchronized void start() {
+        ControlService cs = getOpletContext().getService(ControlService.class);
+        // TODO BUG HERE: the control alias needs to be unique across the
+        // entire provider instance (multiple topologies) because the ControlService
+        // is provider-wide, not topology specific.
+        // Scope it with just the jobId.  What's going to unregister this control?
+        if (cs != null)
+            cs.registerControl(TSTREAM_TYPE, getOpletContext().uniquify(getClass().getSimpleName()), 
+                    getAlias(), PeriodMXBean.class, this);
+        schedule(false);
+    }
+    
+    private String getAlias() {
+        OutputPortContext oc = getOpletContext().getOutputContext().get(0);
+        return oc.getAlias();
+    }
+
+    private synchronized void schedule(boolean delay) {
+        future = getOpletContext().getService(ScheduledExecutorService.class).scheduleAtFixedRate(
+                getRunnable(), delay ? getPeriod() : 0, getPeriod(), getUnit());
+    }
+
+    protected Runnable getRunnable() {
+        return this;
+    }
+
+    protected abstract void fetchTuples() throws Exception;
+
+    @Override
+    public void run() {
+        try {
+            fetchTuples();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public synchronized long getPeriod() {
+        return period;
+    }
+
+    @Override
+    public synchronized TimeUnit getUnit() {
+        return unit;
+    }
+
+    @Override
+    public synchronized void setPeriod(long period) {
+        setPeriod(period, getUnit());
+    }
+
+    @Override
+    public synchronized void setPeriod(long period, TimeUnit unit) {
+        if (period <= 0)
+            throw new IllegalArgumentException();
+        if (this.period != period || this.unit != unit) {
+            future.cancel(false);
+            this.period = period;
+            this.unit = unit;
+            schedule(true);
+        }  
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/Pipe.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/Pipe.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Pipe.java
new file mode 100644
index 0000000..35c4986
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Pipe.java
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.oplet.OpletContext;
+
+/**
+ * Pipe oplet with a single input and output. 
+ *
+ * @param <I>
+ *            Data container type for input tuples.
+ * @param <O>
+ *            Data container type for output tuples.
+ */
+public abstract class Pipe<I, O> extends AbstractOplet<I, O>implements Consumer<I> {
+    private static final long serialVersionUID = 1L;
+
+    private Consumer<O> destination;
+
+    @Override
+    public void initialize(OpletContext<I, O> context) {
+        super.initialize(context);
+
+        destination = context.getOutputs().get(0);
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public List<Consumer<I>> getInputs() {
+        return Collections.singletonList(this);
+    }
+
+    protected Consumer<O> getDestination() {
+        return destination;
+    }
+    
+    /**
+     * Submit a tuple to single output.
+     * @param tuple Tuple to be submitted.
+     */
+    protected void submit(O tuple) {
+        getDestination().accept(tuple);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/ProcessSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/ProcessSource.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/ProcessSource.java
new file mode 100644
index 0000000..20222aa
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/ProcessSource.java
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.concurrent.ThreadFactory;
+
+public abstract class ProcessSource<T> extends Source<T>implements Runnable {
+
+    @Override
+    public void start() {
+        Thread t = getOpletContext().getService(ThreadFactory.class).newThread(this);
+        t.setDaemon(false);
+        t.start();
+    }
+
+    protected Runnable getRunnable() {
+        return this;
+    }
+
+    protected abstract void process() throws Exception;
+
+    @Override
+    public void run() {
+        try {
+            process();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/Sink.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/Sink.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Sink.java
new file mode 100644
index 0000000..33bc5eb
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Sink.java
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import static org.apache.edgent.function.Functions.closeFunction;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Functions;
+
+/**
+ * Sink a stream by processing each tuple through
+ * a {@link Consumer}.
+ * If the {@code sinker} function implements {@code AutoCloseable}
+ * then when this oplet is closed {@code sinker.close()} is called.
+ *
+ * @param <T> Tuple type.
+ */
+public class Sink<T> extends AbstractOplet<T, Void> {
+    
+    private Consumer<T> sinker;
+
+    /**
+     * Create a  {@code Sink} that discards all tuples.
+     * The sink function can be changed using
+     * {@link #setSinker(Consumer)}.
+     */
+    public Sink() {
+        setSinker(Functions.discard());
+    }
+    
+    /**
+     * Create a {@code Sink} oplet.
+     * @param sinker Processing to be performed on each tuple.
+     */
+    public Sink(Consumer<T> sinker) {
+        setSinker(sinker);
+    }
+
+    @Override
+    public List<Consumer<T>> getInputs() {
+        return Collections.singletonList(getSinker());
+    }
+
+    @Override
+    public void start() {
+    }
+    
+    @Override
+    public void close() throws Exception {
+        closeFunction(getSinker());
+    }
+    
+    /**
+     * Set the sink function.
+     * @param sinker Processing to be performed on each tuple.
+     */
+    protected void setSinker(Consumer<T> sinker) {
+        this.sinker = sinker;
+    }
+    
+    /**
+     * Get the sink function that processes each tuple.
+     * @return function that processes each tuple.
+     */
+    protected Consumer<T> getSinker() {
+        return sinker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/Source.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/Source.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Source.java
new file mode 100644
index 0000000..eb9f2e8
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Source.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.oplet.OpletContext;
+
+public abstract class Source<T> extends AbstractOplet<Void, T> {
+
+    private Consumer<T> destination;
+
+    @Override
+    public void initialize(OpletContext<Void, T> context) {
+        super.initialize(context);
+
+        destination = context.getOutputs().get(0);
+    }
+
+    protected Consumer<T> getDestination() {
+        return destination;
+    }
+    
+    /**
+     * Submit a tuple to single output.
+     * @param tuple Tuple to be submitted.
+     */
+    protected void submit(T tuple) {
+        getDestination().accept(tuple);
+    }
+
+    @Override
+    public final List<Consumer<Void>> getInputs() {
+        return Collections.emptyList();
+    }
+}