You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:45 UTC
[44/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/ProcessSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/ProcessSource.java b/api/oplet/src/main/java/edgent/oplet/core/ProcessSource.java
deleted file mode 100644
index ca26a09..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/ProcessSource.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.concurrent.ThreadFactory;
-
-public abstract class ProcessSource<T> extends Source<T>implements Runnable {
-
- @Override
- public void start() {
- Thread t = getOpletContext().getService(ThreadFactory.class).newThread(this);
- t.setDaemon(false);
- t.start();
- }
-
- protected Runnable getRunnable() {
- return this;
- }
-
- protected abstract void process() throws Exception;
-
- @Override
- public void run() {
- try {
- process();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Sink.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Sink.java b/api/oplet/src/main/java/edgent/oplet/core/Sink.java
deleted file mode 100644
index f8f087d..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Sink.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import static edgent.function.Functions.closeFunction;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-import edgent.function.Functions;
-
-/**
- * Sink a stream by processing each tuple through
- * a {@link Consumer}.
- * If the {@code sinker} function implements {@code AutoCloseable}
- * then when this oplet is closed {@code sinker.close()} is called.
- *
- * @param <T> Tuple type.
- */
-public class Sink<T> extends AbstractOplet<T, Void> {
-
- private Consumer<T> sinker;
-
- /**
- * Create a {@code Sink} that discards all tuples.
- * The sink function can be changed using
- * {@link #setSinker(Consumer)}.
- */
- public Sink() {
- setSinker(Functions.discard());
- }
-
- /**
- * Create a {@code Sink} oplet.
- * @param sinker Processing to be performed on each tuple.
- */
- public Sink(Consumer<T> sinker) {
- setSinker(sinker);
- }
-
- @Override
- public List<Consumer<T>> getInputs() {
- return Collections.singletonList(getSinker());
- }
-
- @Override
- public void start() {
- }
-
- @Override
- public void close() throws Exception {
- closeFunction(getSinker());
- }
-
- /**
- * Set the sink function.
- * @param sinker Processing to be performed on each tuple.
- */
- protected void setSinker(Consumer<T> sinker) {
- this.sinker = sinker;
- }
-
- /**
- * Get the sink function that processes each tuple.
- * @return function that processes each tuple.
- */
- protected Consumer<T> getSinker() {
- return sinker;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Source.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Source.java b/api/oplet/src/main/java/edgent/oplet/core/Source.java
deleted file mode 100644
index b7c1dcc..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Source.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-import edgent.oplet.OpletContext;
-
-public abstract class Source<T> extends AbstractOplet<Void, T> {
-
- private Consumer<T> destination;
-
- @Override
- public void initialize(OpletContext<Void, T> context) {
- super.initialize(context);
-
- destination = context.getOutputs().get(0);
- }
-
- protected Consumer<T> getDestination() {
- return destination;
- }
-
- /**
- * Submit a tuple to single output.
- * @param tuple Tuple to be submitted.
- */
- protected void submit(T tuple) {
- getDestination().accept(tuple);
- }
-
- @Override
- public final List<Consumer<Void>> getInputs() {
- return Collections.emptyList();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Split.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Split.java b/api/oplet/src/main/java/edgent/oplet/core/Split.java
deleted file mode 100644
index 0480f27..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Split.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import static edgent.function.Functions.closeFunction;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-import edgent.function.ToIntFunction;
-import edgent.oplet.OpletContext;
-
-/**
- * Split a stream into multiple streams depending
- * on the result of a splitter function.
- * <BR>
- * For each tuple a function is called:
- * <UL>
- * <LI>If the return is negative the tuple is dropped.</LI>
- * <LI>Otherwise the return value is modded by the number of
- * output ports and the result is the output port index
- * the tuple is submitted to.</LI>
- * </UL>
- *
- * @param <T> Type of the tuple.
- */
-public class Split<T> extends AbstractOplet<T, T> implements Consumer<T> {
-
- private static final long serialVersionUID = 1L;
- private final ToIntFunction<T> splitter;
- private List<? extends Consumer<T>> destinations;
- private int n;
-
- public Split(ToIntFunction<T> splitter) {
- this.splitter = splitter;
- }
-
-
- @Override
- public void initialize(OpletContext<T, T> context) {
- super.initialize(context);
-
- destinations = context.getOutputs();
- n = destinations.size();
- }
-
- @Override
- public void start() {
- }
-
- @Override
- public List<Consumer<T>> getInputs() {
- return Collections.singletonList(this);
- }
-
- @Override
- public void accept(T tuple) {
- int s = splitter.applyAsInt(tuple);
- if (s >= 0)
- destinations.get(s % n).accept(tuple);
- }
-
- @Override
- public void close() throws Exception {
- closeFunction(splitter);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/Union.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/Union.java b/api/oplet/src/main/java/edgent/oplet/core/Union.java
deleted file mode 100644
index 0237246..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/Union.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.core;
-
-import edgent.oplet.OpletContext;
-
-/**
- * Union oplet, merges multiple input ports
- * into a single output port.
- *
- * Processing for each input is identical
- * and just submits the tuple to the single output.
- *
- * @param <T> Tuple type
- */
-public final class Union<T> extends FanIn<T, T> {
-
- @Override
- public void start() {
- }
-
-
- @Override
- public void initialize(OpletContext<T, T> context) {
- super.initialize(context);
-
- // forward every tuple to the output port
- setReceiver((tuple, iportIndex) -> tuple);
- }
-
- @Override
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/mbeans/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/mbeans/package-info.java b/api/oplet/src/main/java/edgent/oplet/core/mbeans/package-info.java
deleted file mode 100644
index 5ea10b6..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/mbeans/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Management beans for core oplets.
- */
-package edgent.oplet.core.mbeans;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/core/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/core/package-info.java b/api/oplet/src/main/java/edgent/oplet/core/package-info.java
deleted file mode 100644
index b208400..0000000
--- a/api/oplet/src/main/java/edgent/oplet/core/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Core primitive oplets.
- */
-package edgent.oplet.core;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/Events.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/Events.java b/api/oplet/src/main/java/edgent/oplet/functional/Events.java
deleted file mode 100644
index d24e4a0..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/Events.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-
-import static edgent.function.Functions.closeFunction;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-
-import edgent.function.Consumer;
-import edgent.oplet.core.Source;
-
-/**
- * Generate tuples from events.
- * This oplet implements {@link Consumer} which
- * can be called directly from an event handler,
- * listener or callback.
- *
- * @param <T> Data container type for output tuples.
- */
-public class Events<T> extends Source<T>implements Consumer<T> {
-
- private static final long serialVersionUID = 1L;
- private Consumer<Consumer<T>> eventSetup;
-
- public Events(Consumer<Consumer<T>> eventSetup) {
- this.eventSetup = eventSetup;
- }
-
- @Override
- public void close() throws Exception {
- closeFunction(eventSetup);
- }
-
- @Override
- public void start() {
- // Allocate a thread so the job containing this oplet
- // doesn't look "complete" and shutdown.
- Thread endlessEventSource = getOpletContext()
- .getService(ThreadFactory.class)
- .newThread(() -> {
- try {
- Thread.sleep(Long.MAX_VALUE);
- }
- catch (InterruptedException e) {
- // cancelled; we're done.
- }
- });
- endlessEventSource.setDaemon(false);
- endlessEventSource.start();
-
- // It's possible for uses to do things like a blocking connect
- // to an external system from eventSetup.accept() so run it as
- // a task to avoid start() / submit-job timeouts.
- getOpletContext().getService(ScheduledExecutorService.class)
- .submit(() -> eventSetup.accept(this));
- }
-
- @Override
- public void accept(T tuple) {
- submit(tuple);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/Filter.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/Filter.java b/api/oplet/src/main/java/edgent/oplet/functional/Filter.java
deleted file mode 100644
index 4ce1590..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/Filter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-import static edgent.function.Functions.closeFunction;
-
-import edgent.function.Predicate;
-import edgent.oplet.core.Pipe;
-
-public class Filter<T> extends Pipe<T, T> {
- private static final long serialVersionUID = 1L;
- private Predicate<T> filter;
-
- public Filter(Predicate<T> filter) {
- this.filter = filter;
- }
-
- @Override
- public void accept(T tuple) {
- if (filter.test(tuple))
- submit(tuple);
- }
-
- @Override
- public void close() throws Exception {
- closeFunction(filter);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/FlatMap.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/FlatMap.java b/api/oplet/src/main/java/edgent/oplet/functional/FlatMap.java
deleted file mode 100644
index f1cc82b..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/FlatMap.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-import static edgent.function.Functions.closeFunction;
-
-import edgent.function.Function;
-import edgent.oplet.core.Pipe;
-
-/**
- *
- * Map an input tuple to 0-N output tuples.
- *
- * Uses a function that returns an iterable
- * to map the input tuple. The return value
- * of the function's apply method is
- * iterated through with each returned
- * value being submitted as an output tuple.
- *
- *
- * @param <I>
- * Data container type for input tuples.
- * @param <O>
- * Data container type for output tuples.
- */
-public class FlatMap<I, O> extends Pipe<I, O> {
- private static final long serialVersionUID = 1L;
-
- private Function<I, Iterable<O>> function;
-
- public FlatMap(Function<I, Iterable<O>> function) {
- this.function = function;
- }
-
- @Override
- public void accept(I tuple) {
- Iterable<O> outputs = function.apply(tuple);
- if (outputs != null) {
- for (O output : outputs) {
- if (output != null)
- submit(output);
- }
- }
- }
-
- @Override
- public void close() throws Exception {
- closeFunction(function);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/Map.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/Map.java b/api/oplet/src/main/java/edgent/oplet/functional/Map.java
deleted file mode 100644
index 2d80c61..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/Map.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-import static edgent.function.Functions.closeFunction;
-
-import edgent.function.Function;
-import edgent.oplet.core.Pipe;
-
-/**
- * Map an input tuple to 0-1 output tuple
- *
- *
- * @param <I>
- * Data container type for input tuples.
- * @param <O>
- * Data container type for output tuples.
- */
-public class Map<I, O> extends Pipe<I, O> {
- private static final long serialVersionUID = 1L;
- private Function<I, O> function;
-
- public Map(Function<I, O> function) {
- this.function = function;
- }
-
- @Override
- public void accept(I tuple) {
- O output = function.apply(tuple);
- if (output != null)
- submit(output);
- }
-
- @Override
- public void close() throws Exception {
- closeFunction(function);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/Peek.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/Peek.java b/api/oplet/src/main/java/edgent/oplet/functional/Peek.java
deleted file mode 100644
index 992b945..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/Peek.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-
-import static edgent.function.Functions.closeFunction;
-
-import edgent.function.Consumer;
-
-/**
- * Functional peek oplet.
- *
- * Each peek calls {@code peeker.accept(tuple)}.
- *
- * @param <T> Tuple type.
- */
-public class Peek<T> extends edgent.oplet.core.Peek<T> {
- private static final long serialVersionUID = 1L;
- private final Consumer<T> peeker;
-
- /**
- * Peek oplet using a function to peek.
- * @param peeker Function that peeks at the tuple.
- */
- public Peek(Consumer<T> peeker) {
- this.peeker = peeker;
- }
-
- protected Consumer<T> getPeeker() {
- return peeker;
- }
-
- @Override
- protected void peek(T tuple) {
- peeker.accept(tuple);
- }
-
- @Override
- public void close() throws Exception {
- closeFunction(peeker);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/SupplierPeriodicSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/SupplierPeriodicSource.java b/api/oplet/src/main/java/edgent/oplet/functional/SupplierPeriodicSource.java
deleted file mode 100644
index 42e35cd..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/SupplierPeriodicSource.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-import static edgent.function.Functions.closeFunction;
-
-import java.util.concurrent.TimeUnit;
-
-import edgent.function.Supplier;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.PeriodicSource;
-
-public class SupplierPeriodicSource<T> extends PeriodicSource<T> {
-
- private Supplier<T> data;
-
- public SupplierPeriodicSource(long period, TimeUnit unit, Supplier<T> data) {
- super(period, unit);
- this.data = data;
- }
-
- @Override
- public void initialize(OpletContext<Void, T> context) {
- super.initialize(context);
- }
-
- @Override
- public void close() throws Exception {
- closeFunction(data);
- }
-
- @Override
- public void fetchTuples() {
- T tuple = data.get();
- if (tuple != null)
- submit(tuple);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/SupplierSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/SupplierSource.java b/api/oplet/src/main/java/edgent/oplet/functional/SupplierSource.java
deleted file mode 100644
index 6bf63ee..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/SupplierSource.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.functional;
-
-import static edgent.function.Functions.closeFunction;
-
-import edgent.function.Supplier;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.ProcessSource;
-
-public class SupplierSource<T> extends ProcessSource<T> {
-
- private Supplier<Iterable<T>> data;
-
- public SupplierSource() {
- }
-
- public SupplierSource(Supplier<Iterable<T>> data) {
- this.data = data;
- }
-
- @Override
- public void initialize(OpletContext<Void, T> context) {
- super.initialize(context);
- }
-
- @Override
- public void close() throws Exception {
- closeFunction(data);
- }
-
- @Override
- public void process() {
- for (T tuple : data.get()) {
- if (tuple != null)
- submit(tuple);
-
- if (Thread.currentThread().isInterrupted())
- break;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/functional/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/functional/package-info.java b/api/oplet/src/main/java/edgent/oplet/functional/package-info.java
deleted file mode 100644
index b00ca88..0000000
--- a/api/oplet/src/main/java/edgent/oplet/functional/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Oplets that process tuples using functions.
- */
-package edgent.oplet.functional;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/package-info.java b/api/oplet/src/main/java/edgent/oplet/package-info.java
deleted file mode 100644
index 05b4013..0000000
--- a/api/oplet/src/main/java/edgent/oplet/package-info.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Oplets API.
- * <P>
- * An oplet is a stream processor that can have 0-N input ports and 0-M output ports.
- * Tuples on streams connected to an oplet's input port are delivered to the oplet for processing.
- * The oplet submits tuples to its output ports which results in the tuples
- * being present on the connected streams.
- * </P>
- */
-package edgent.oplet;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/plumbing/Barrier.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/plumbing/Barrier.java b/api/oplet/src/main/java/edgent/oplet/plumbing/Barrier.java
deleted file mode 100644
index 2aff466..0000000
--- a/api/oplet/src/main/java/edgent/oplet/plumbing/Barrier.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.plumbing;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-
-import edgent.function.BiFunction;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.FanIn;
-
-/**
- * A tuple synchronization barrier.
- * <P>
- * {@code Barrier} has n input ports with tuple type {@code T}
- * and one output port with tuple type {@code List<T>}.
- * Once the oplet receives one tuple on each of its input ports,
- * it generates an output tuple containing one tuple from each input port.
- * It then awaits receiving the next collection of tuples.
- * Input port 0's tuple is in list[0], port 1's tuple in list[1], and so on.
- * </P><P>
- * Each input port has an associated queue of size {@code queueCapacity}.
- * The input port's {@code Consumer<T>.accept()} will block if it's queue is full.
- * </P>
- *
- * @param <T> Type of the tuple.
- */
-public class Barrier<T> extends FanIn<T, List<T>> {
-
- private final int queueCapacity;
- private Thread thread;
- private List<LinkedBlockingQueue<T>> iportQueues;
-
- /**
- * Create a new instance.
- * @param queueCapacity size of each input port's blocking queue
- */
- public Barrier(int queueCapacity) {
- this.queueCapacity = queueCapacity;
- }
-
- @Override
- public void initialize(OpletContext<T, List<T>> context) {
- super.initialize(context);
-
- thread = context.getService(ThreadFactory.class).newThread(() -> run());
-
- int numIports = getOpletContext().getInputCount();
- iportQueues = new ArrayList<>(numIports);
- for (int i = 0; i < numIports; i++)
- iportQueues.add(new LinkedBlockingQueue<>(queueCapacity));
-
- setReceiver(receiver());
- }
-
- @Override
- public void start() {
- thread.start();
- }
-
- protected BiFunction<T,Integer,List<T>> receiver() {
- return (tuple, iportIndex) -> {
- accept(tuple, iportIndex);
- return null;
- };
- }
-
- protected void accept(T tuple, int iportIndex) {
- try {
- iportQueues.get(iportIndex).put(tuple);
- } catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
-
- private void run() {
- while (!Thread.interrupted()) {
- try {
- List<T> list = new ArrayList<>(iportQueues.size());
- for (LinkedBlockingQueue<T> iport : iportQueues) {
- list.add(iport.take());
- }
- submit(list);
- } catch (InterruptedException e) {
- break;
- }
- }
- }
-
- @Override
- public void close() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/plumbing/Isolate.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/plumbing/Isolate.java b/api/oplet/src/main/java/edgent/oplet/plumbing/Isolate.java
deleted file mode 100644
index 8824076..0000000
--- a/api/oplet/src/main/java/edgent/oplet/plumbing/Isolate.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.plumbing;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.Pipe;
-
-/**
- * Isolate upstream processing from downstream
- * processing guaranteeing tuple order.
- * Input tuples are placed at the tail of a queue
- * and dedicated thread removes them from the
- * head and is used for downstream processing.
- *
- * @param <T> Type of the tuple.
- */
-public class Isolate<T> extends Pipe<T,T> {
- private static final long serialVersionUID = 1L;
-
- private Thread thread;
- private final LinkedBlockingQueue<T> tuples;
-
- /**
- * Create a new Isolate oplet.
- * <BR>
- * Same as Isolate(Integer.MAX_VALUE).
- */
- public Isolate() {
- this(Integer.MAX_VALUE);
- }
-
- /**
- * Create a new Isolate oplet.
- * @param queueCapacity size of the queue between the input stream
- * and the output stream.
- * {@link #accept(Object) accept} blocks when the queue is full.
- */
- public Isolate(int queueCapacity) {
- tuples = new LinkedBlockingQueue<>(queueCapacity);
- }
-
- @Override
- public void initialize(OpletContext<T, T> context) {
- super.initialize(context);
- thread = context.getService(ThreadFactory.class).newThread(() -> run());
- }
-
- @Override
- public void start() {
- super.start();
- thread.start();
- }
-
- @Override
- public void accept(T tuple) {
- try {
- tuples.put(tuple);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
-
- private void run() {
- while (!Thread.interrupted()) {
- try {
- submit(tuples.take());
- } catch (InterruptedException e) {
- break;
- }
- }
- }
-
- @Override
- public void close() throws Exception {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/plumbing/PressureReliever.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/plumbing/PressureReliever.java b/api/oplet/src/main/java/edgent/oplet/plumbing/PressureReliever.java
deleted file mode 100644
index 1b295c9..0000000
--- a/api/oplet/src/main/java/edgent/oplet/plumbing/PressureReliever.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.plumbing;
-
-import java.util.LinkedList;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import edgent.function.BiConsumer;
-import edgent.function.Function;
-import edgent.function.Functions;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.Pipe;
-import edgent.window.Partition;
-import edgent.window.PartitionedState;
-import edgent.window.Policies;
-import edgent.window.Window;
-import edgent.window.Windows;
-
-/**
- * Relieve pressure on upstream oplets by discarding tuples.
- * This oplet ensures that upstream processing is not
- * constrained by any delay in downstream processing,
- * for example by a sink oplet not being able to connect
- * to its external system.
- * When downstream processing cannot keep up with the input rate
- * this oplet maintains a defined window of the most recent
- * tuples and discards any earlier tuples using arrival order.
- * <P>
- * A window partition is maintained for each key seen
- * on the input stream. Any tuple arriving on the input
- * stream is inserted into the window. Asynchronously
- * tuples are taken from the window using FIFO and
- * submitted downstream. The submission of tuples maintains
- * order within a partition but not across partitions.
- * </P>
- * <P>
- * Tuples are <B>discarded and not</B> submitted to the
- * output port if the downstream processing cannot keep up
- * the incoming tuple rate.
- * </P>
- * <UL>
- * <LI>For a {@link #PressureReliever(int, Function) count}
- * {@code PressureReliever} up to last (most recent) {@code N} tuples
- * are maintained in a window partition.
- * <BR> Asynchronous tuple submission removes the last (oldest) tuple in the partition
- * before submitting it downstream.
- * <BR> If when an input tuple is processed the window partition contains N tuples, then
- * the first (oldest) tuple in the partition is discarded before the input tuple is inserted into the window.
- * </UL>
- * <P>
- * Insertion of the oplet into a stream disconnects the
- * upstream processing from the downstream processing,
- * so that downstream processing is executed on a different
- * thread to the thread that processed the input tuple.
- * </P>
- *
- * @param <T> Tuple type.
- * @param <K> Key type.
- */
-public class PressureReliever<T, K> extends Pipe<T, T> {
- private static final long serialVersionUID = 1L;
-
- private ScheduledExecutorService executor;
- private final Window<T, K, LinkedList<T>> window;
-
- /**
- * Pressure reliever that maintains up to {@code count} most recent tuples per key.
- *
- * @param count Number of tuples to maintain where downstream processing cannot keep up.
- * @param keyFunction Key function for tuples.
- */
- public PressureReliever(int count, Function<T, K> keyFunction) {
- window = Windows.window(
- Policies.alwaysInsert(),
- Policies.countContentsPolicy(count),
- Policies.evictOldest(),
- new FirstSubmitter(),
- keyFunction,
- () -> new LinkedList<T>());
-
- // No processing of the window takes place
- window.registerPartitionProcessor((tuples, k) -> { });
- }
-
- @Override
- public void initialize(OpletContext<T, T> context) {
- super.initialize(context);
- executor = context.getService(ScheduledExecutorService.class);
- }
-
- @Override
- public void accept(T tuple) {
- window.insert(tuple);
- }
-
- @Override
- public void close() throws Exception {
- }
-
- private class FirstSubmitter extends PartitionedState<K, AtomicBoolean>
- implements BiConsumer<Partition<T, K, LinkedList<T>>, T> {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- FirstSubmitter() {
- super(() -> new AtomicBoolean());
- }
-
- /**
- * Process the window (to consume the oldest tuple in the partition)
- * only if a tuple from this partition is not already being consumed.
- *
- * @param t
- * @param v
- */
- @Override
- public void accept(Partition<T, K, LinkedList<T>> partition, T tuple) {
- submitNextTuple(partition);
- }
-
- private void submitNextTuple(Partition<T, K, LinkedList<T>> partition) {
- final K key = partition.getKey();
- final AtomicBoolean latch = getState(key);
- if (!latch.compareAndSet(false, true))
- return;
-
- final T firstTuple;
- synchronized (partition) {
- final LinkedList<T> contents = partition.getContents();
- if (contents.isEmpty()) {
- latch.set(false);
- return;
- }
-
- firstTuple = contents.removeFirst();
- }
-
- Runnable submit = Functions.delayedConsume(getDestination(), firstTuple);
- submit = Functions.runWithFinal(submit, () -> {
- latch.set(false);
- submitNextTuple(partition);
- });
-
- executor.execute(submit);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/plumbing/UnorderedIsolate.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/plumbing/UnorderedIsolate.java b/api/oplet/src/main/java/edgent/oplet/plumbing/UnorderedIsolate.java
deleted file mode 100644
index 9fd42b0..0000000
--- a/api/oplet/src/main/java/edgent/oplet/plumbing/UnorderedIsolate.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.plumbing;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import edgent.function.Functions;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.Pipe;
-
-/**
- * Isolate upstream processing from downstream
- * processing without guaranteeing tuple order.
- * An executor is used for downstream processing
- * thus tuple order cannot be guaranteed as the
- * scheduler does not guarantee execution order.
- *
- * @param <T> Type of the tuple.
- */
-public class UnorderedIsolate<T> extends Pipe<T,T> {
- private static final long serialVersionUID = 1L;
-
- private ScheduledExecutorService executor;
-
- @Override
- public void initialize(OpletContext<T, T> context) {
- super.initialize(context);
- executor = context.getService(ScheduledExecutorService.class);
- }
-
- @Override
- public void accept(T tuple) {
- executor.execute(Functions.delayedConsume(getDestination(), tuple));
- }
-
- @Override
- public void close() throws Exception {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/plumbing/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/plumbing/package-info.java b/api/oplet/src/main/java/edgent/oplet/plumbing/package-info.java
deleted file mode 100644
index c844823..0000000
--- a/api/oplet/src/main/java/edgent/oplet/plumbing/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Oplets that control the flow of tuples.
- */
-package edgent.oplet.plumbing;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/window/Aggregate.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/window/Aggregate.java b/api/oplet/src/main/java/edgent/oplet/window/Aggregate.java
deleted file mode 100644
index 4afa83d..0000000
--- a/api/oplet/src/main/java/edgent/oplet/window/Aggregate.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.oplet.window;
-
-import static edgent.function.Functions.closeFunction;
-
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.oplet.OpletContext;
-import edgent.oplet.core.Pipe;
-import edgent.window.Window;
-
-/**
- * Aggregate a window.
- * Window contents are aggregated by a
- * {@link BiFunction aggregator function}
- * passing the list of tuples in the window and
- * the partition key. The returned value
- * is submitted to the sole output port
- * if it is not {@code null}.
- *
- * @param <T> Type of the input tuples.
- * @param <U> Type of the output tuples.
- * @param <K> Type of the partition key.
- */
-public class Aggregate<T,U,K> extends Pipe<T, U> {
- private static final long serialVersionUID = 1L;
- private final Window<T,K, ? extends List<T>> window;
- /**
- * The aggregator provided by the user.
- */
- private final BiFunction<List<T>,K, U> aggregator;
-
- public Aggregate(Window<T,K, ? extends List<T>> window, BiFunction<List<T>,K, U> aggregator){
- this.aggregator = aggregator;
- BiConsumer<List<T>, K> partProcessor = (tuples, key) -> {
- U aggregateTuple = aggregator.apply(tuples, key);
- if (aggregateTuple != null)
- submit(aggregateTuple);
- };
-
- window.registerPartitionProcessor(partProcessor);
- this.window=window;
- }
-
- @Override
- public void initialize(OpletContext<T,U> context) {
- super.initialize(context);
- window.registerScheduledExecutorService(this.getOpletContext().getService(ScheduledExecutorService.class));
- }
-
- @Override
- public void accept(T tuple) {
- window.insert(tuple);
- }
-
- @Override
- public void close() throws Exception {
- closeFunction(aggregator);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/edgent/oplet/window/package-info.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/edgent/oplet/window/package-info.java b/api/oplet/src/main/java/edgent/oplet/window/package-info.java
deleted file mode 100644
index 3f740ac..0000000
--- a/api/oplet/src/main/java/edgent/oplet/window/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Oplets using windows.
- */
-package edgent.oplet.window;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/JobContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/JobContext.java b/api/oplet/src/main/java/org/apache/edgent/oplet/JobContext.java
new file mode 100644
index 0000000..1adc9d5
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/JobContext.java
@@ -0,0 +1,36 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet;
+
+/**
+ * Information about an oplet invocation's job.
+ */
+public interface JobContext {
+ /**
+ * Get the runtime identifier for the job containing this {@link Oplet}.
+ * @return The job identifier for the application being executed.
+ */
+ String getId();
+
+ /**
+ * Get the name of the job containing this {@link Oplet}.
+ * @return The job name for the application being executed.
+ */
+ String getName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/Oplet.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/Oplet.java b/api/oplet/src/main/java/org/apache/edgent/oplet/Oplet.java
new file mode 100644
index 0000000..642ddc6
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/Oplet.java
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet;
+
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+
+/**
+ * Generic API for an oplet that processes streaming data on 0-N input ports
+ * and produces 0-M output streams on its output ports. An input port may be
+ * connected with any number of streams from other oplets. An output port may
+ * connected to any number of input ports on other oplets.
+ *
+ * @param <I>
+ * Data container type for input tuples.
+ * @param <O>
+ * Data container type for output tuples.
+ */
+public interface Oplet<I, O> extends AutoCloseable {
+
+ /**
+ * Initialize the oplet.
+ *
+ * @param context the OpletContext
+ * @throws Exception on failure
+ */
+ void initialize(OpletContext<I, O> context) throws Exception;
+
+ /**
+ * Start the oplet. Oplets must not submit any tuples not derived from
+ * input tuples until this method is called.
+ */
+ void start();
+
+ /**
+ * Get the input stream data handlers for this oplet. The number of handlers
+ * must equal the number of configured input ports. Each tuple
+ * arriving on an input port will be sent to the stream handler for that
+ * input port.
+ *
+ * @return list of consumers
+ */
+ List<? extends Consumer<I>> getInputs();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/OpletContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/OpletContext.java b/api/oplet/src/main/java/org/apache/edgent/oplet/OpletContext.java
new file mode 100644
index 0000000..9d7052f
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/OpletContext.java
@@ -0,0 +1,103 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet;
+
+import java.util.List;
+
+import org.apache.edgent.execution.services.RuntimeServices;
+import org.apache.edgent.function.Consumer;
+
+/**
+ * Context information for the {@code Oplet}'s invocation context.
+ * <P>
+ * At execution time an oplet uses its invocation context to retrieve
+ * provided {@link #getService(Class) services},
+ * {@link #getOutputs() output ports} for tuple submission
+ * and {@link #getJobContext() job} information.
+ *
+ * @param <I> tuple type of input streams
+ * @param <O> tuple type of output streams
+ */
+public interface OpletContext<I, O> extends RuntimeServices {
+
+ /**
+ * Get the unique identifier (within the running job)
+ * for this oplet.
+ * @return unique identifier for this oplet
+ */
+ String getId();
+
+ /**
+ * {@inheritDoc}
+ * <P>
+ * Get a service for this oplet invocation.
+ *
+ * An invocation of an oplet may get access to services,
+ * which provide specific functionality, such as metrics.
+ * </P>
+ *
+ */
+ @Override
+ <T> T getService(Class<T> serviceClass);
+
+ /**
+ * Get the number of connected inputs to this oplet.
+ * @return number of connected inputs to this oplet.
+ */
+ int getInputCount();
+
+ /**
+ * Get the number of connected outputs to this oplet.
+ * @return number of connected outputs to this oplet.
+ */
+ int getOutputCount();
+
+ /**
+ * Get the mechanism to submit tuples on an output port.
+ *
+ * @return list of consumers
+ */
+ List<? extends Consumer<O>> getOutputs();
+
+ /**
+ * Get the oplet's output port context information.
+ * @return list of {@link OutputPortContext}, one for each output port.
+ */
+ List<OutputPortContext> getOutputContext();
+
+ /**
+ * Get the job hosting this oplet.
+ * @return {@link JobContext} hosting this oplet invocation.
+ */
+ JobContext getJobContext();
+
+ /**
+ * Creates a unique name within the context of the current runtime.
+ * <p>
+ * The default implementation adds a suffix composed of the package
+ * name of this interface, the current job and oplet identifiers,
+ * all separated by periods ({@code '.'}). Developers should use this
+ * method to avoid name clashes when they store or register the name in
+ * an external container or registry.
+ *
+ * @param name name (possibly non-unique)
+ * @return unique name within the context of the current runtime.
+ */
+ String uniquify(String name);
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/OutputPortContext.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/OutputPortContext.java b/api/oplet/src/main/java/org/apache/edgent/oplet/OutputPortContext.java
new file mode 100644
index 0000000..63fe2c8
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/OutputPortContext.java
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet;
+
+/**
+ * Information about an oplet output port.
+ */
+public interface OutputPortContext {
+ /**
+ * Get the alias of the output port if any.
+ * @return the alias; null if none.
+ */
+ String getAlias();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/AbstractOplet.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/AbstractOplet.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/AbstractOplet.java
new file mode 100644
index 0000000..5656e8b
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/AbstractOplet.java
@@ -0,0 +1,36 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import org.apache.edgent.oplet.Oplet;
+import org.apache.edgent.oplet.OpletContext;
+
+public abstract class AbstractOplet<I, O> implements Oplet<I, O> {
+
+ private OpletContext<I, O> context;
+
+ @Override
+ public void initialize(OpletContext<I, O> context) {
+ this.context = context;
+ }
+
+ public final OpletContext<I, O> getOpletContext() {
+ return context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanIn.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanIn.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanIn.java
new file mode 100644
index 0000000..ecdfafb
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanIn.java
@@ -0,0 +1,117 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.oplet.OpletContext;
+
+/**
+ * FanIn oplet, merges multiple input ports into a single output port.
+ * <P>
+ * For each tuple received, {@code receiver.apply(T tuple, Integer index)}
+ * is called. {@code index} is the tuple's input stream's index, where
+ * {@code this} is index 0 followed by {@code others} in their order.
+ * {@code receiver} either returns a tuple to emit on the output
+ * stream or null.
+ * </P>
+ *
+ * @param <T> Tuple type of input streams
+ * @param <U> Tuple type of output stream
+ */
+public class FanIn<T,U> extends AbstractOplet<T, U> {
+ private BiFunction<T, Integer, U> receiver;
+ private List<Consumer<T>> iportConsumers;
+ private Consumer<U> destination;
+
+ public FanIn() {
+ }
+
+ public FanIn(BiFunction<T, Integer, U> receiver) {
+ this.receiver = receiver;
+ }
+
+ @Override
+ public void initialize(OpletContext<T, U> context) {
+ super.initialize(context);
+ destination = context.getOutputs().get(0);
+
+ // Create a consumer for each input port.
+ int numIports = getOpletContext().getInputCount();
+ if (iportConsumers == null) {
+ // each iport invokes the receiver
+ iportConsumers = new ArrayList<>(numIports);
+ for (int i = 0; i < numIports; i++)
+ iportConsumers.add(consumer(i));
+ iportConsumers = Collections.unmodifiableList(iportConsumers);
+ }
+ }
+
+ /**
+ * Set the receiver function. Must be called no later than as part
+ * of {@link #initialize(OpletContext)}.
+ * @param receiver function to receive tuples
+ */
+ protected void setReceiver(BiFunction<T, Integer, U> receiver) {
+ this.receiver = receiver;
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public List<? extends Consumer<T>> getInputs() {
+ return iportConsumers;
+ }
+
+ /**
+ * Create a Consumer for the input port that invokes the
+ * receiver and submits a generated tuple, if any, to the output.
+ * @param iportIndex index of the input port
+ * @return the Consumer
+ */
+ protected Consumer<T> consumer(int iportIndex) {
+ return tuple -> {
+ U result = receiver.apply(tuple, iportIndex);
+ if (result != null)
+ submit(result);
+ };
+ }
+
+ protected Consumer<U> getDestination() {
+ return destination;
+ }
+
+ /**
+ * Submit a tuple to single output.
+ * @param tuple Tuple to be submitted.
+ */
+ protected void submit(U tuple) {
+ getDestination().accept(tuple);
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanOut.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanOut.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanOut.java
new file mode 100644
index 0000000..6441cd6
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/FanOut.java
@@ -0,0 +1,56 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+
+public final class FanOut<T> extends AbstractOplet<T, T> implements Consumer<T> {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private List<? extends Consumer<T>> targets;
+ private int n;
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public List<? extends Consumer<T>> getInputs() {
+ targets = getOpletContext().getOutputs();
+ n = targets.size();
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public void accept(T tuple) {
+ for (int i = 0; i < n; i++)
+ targets.get(i).accept(tuple);
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/Peek.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/Peek.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Peek.java
new file mode 100644
index 0000000..5b07ced
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Peek.java
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+/**
+ * Oplet that allows a peek at each tuple and always forwards a tuple onto
+ * its single output port.
+ *
+ * {@link #peek(Object)} is called before the tuple is forwarded
+ * and it is intended that the peek be a low cost operation
+ * such as increasing a metric.
+ *
+ * @param <T>
+ * Type of the tuple.
+ */
+public abstract class Peek<T> extends Pipe<T, T> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public final void accept(T tuple) {
+ peek(tuple);
+ submit(tuple);
+ }
+
+ protected abstract void peek(T tuple);
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/PeriodicSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/PeriodicSource.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/PeriodicSource.java
new file mode 100644
index 0000000..df7c4af
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/PeriodicSource.java
@@ -0,0 +1,113 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.execution.mbeans.PeriodMXBean;
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.oplet.OpletContext;
+import org.apache.edgent.oplet.OutputPortContext;
+
+public abstract class PeriodicSource<T> extends Source<T> implements Runnable, PeriodMXBean {
+
+ // see comment in TStream.TYPE
+ private static final String TSTREAM_TYPE = /*TStream.TYPE*/"stream";
+
+ private long period;
+ private TimeUnit unit;
+ private ScheduledFuture<?> future;
+
+ protected PeriodicSource(long period, TimeUnit unit) {
+ this.period = period;
+ this.unit = unit;
+ }
+
+ @Override
+ public void initialize(OpletContext<Void, T> context) {
+ super.initialize(context);
+ }
+
+ @Override
+ public synchronized void start() {
+ ControlService cs = getOpletContext().getService(ControlService.class);
+ // TODO BUG HERE: the control alias needs to be unique across the
+ // entire provider instance (multiple topologies) because the ControlService
+ // is provider-wide, not topology specific.
+ // Scope it with just the jobId. What's going to unregister this control?
+ if (cs != null)
+ cs.registerControl(TSTREAM_TYPE, getOpletContext().uniquify(getClass().getSimpleName()),
+ getAlias(), PeriodMXBean.class, this);
+ schedule(false);
+ }
+
+ private String getAlias() {
+ OutputPortContext oc = getOpletContext().getOutputContext().get(0);
+ return oc.getAlias();
+ }
+
+ private synchronized void schedule(boolean delay) {
+ future = getOpletContext().getService(ScheduledExecutorService.class).scheduleAtFixedRate(
+ getRunnable(), delay ? getPeriod() : 0, getPeriod(), getUnit());
+ }
+
+ protected Runnable getRunnable() {
+ return this;
+ }
+
+ protected abstract void fetchTuples() throws Exception;
+
+ @Override
+ public void run() {
+ try {
+ fetchTuples();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public synchronized long getPeriod() {
+ return period;
+ }
+
+ @Override
+ public synchronized TimeUnit getUnit() {
+ return unit;
+ }
+
+ @Override
+ public synchronized void setPeriod(long period) {
+ setPeriod(period, getUnit());
+ }
+
+ @Override
+ public synchronized void setPeriod(long period, TimeUnit unit) {
+ if (period <= 0)
+ throw new IllegalArgumentException();
+ if (this.period != period || this.unit != unit) {
+ future.cancel(false);
+ this.period = period;
+ this.unit = unit;
+ schedule(true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/Pipe.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/Pipe.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Pipe.java
new file mode 100644
index 0000000..35c4986
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Pipe.java
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.oplet.OpletContext;
+
+/**
+ * Pipe oplet with a single input and output.
+ *
+ * @param <I>
+ * Data container type for input tuples.
+ * @param <O>
+ * Data container type for output tuples.
+ */
+public abstract class Pipe<I, O> extends AbstractOplet<I, O>implements Consumer<I> {
+ private static final long serialVersionUID = 1L;
+
+ private Consumer<O> destination;
+
+ @Override
+ public void initialize(OpletContext<I, O> context) {
+ super.initialize(context);
+
+ destination = context.getOutputs().get(0);
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public List<Consumer<I>> getInputs() {
+ return Collections.singletonList(this);
+ }
+
+ protected Consumer<O> getDestination() {
+ return destination;
+ }
+
+ /**
+ * Submit a tuple to single output.
+ * @param tuple Tuple to be submitted.
+ */
+ protected void submit(O tuple) {
+ getDestination().accept(tuple);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/ProcessSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/ProcessSource.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/ProcessSource.java
new file mode 100644
index 0000000..20222aa
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/ProcessSource.java
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.concurrent.ThreadFactory;
+
+public abstract class ProcessSource<T> extends Source<T>implements Runnable {
+
+ @Override
+ public void start() {
+ Thread t = getOpletContext().getService(ThreadFactory.class).newThread(this);
+ t.setDaemon(false);
+ t.start();
+ }
+
+ protected Runnable getRunnable() {
+ return this;
+ }
+
+ protected abstract void process() throws Exception;
+
+ @Override
+ public void run() {
+ try {
+ process();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/Sink.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/Sink.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Sink.java
new file mode 100644
index 0000000..33bc5eb
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Sink.java
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import static org.apache.edgent.function.Functions.closeFunction;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Functions;
+
+/**
+ * Sink a stream by processing each tuple through
+ * a {@link Consumer}.
+ * If the {@code sinker} function implements {@code AutoCloseable}
+ * then when this oplet is closed {@code sinker.close()} is called.
+ *
+ * @param <T> Tuple type.
+ */
+public class Sink<T> extends AbstractOplet<T, Void> {
+
+ private Consumer<T> sinker;
+
+ /**
+ * Create a {@code Sink} that discards all tuples.
+ * The sink function can be changed using
+ * {@link #setSinker(Consumer)}.
+ */
+ public Sink() {
+ setSinker(Functions.discard());
+ }
+
+ /**
+ * Create a {@code Sink} oplet.
+ * @param sinker Processing to be performed on each tuple.
+ */
+ public Sink(Consumer<T> sinker) {
+ setSinker(sinker);
+ }
+
+ @Override
+ public List<Consumer<T>> getInputs() {
+ return Collections.singletonList(getSinker());
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeFunction(getSinker());
+ }
+
+ /**
+ * Set the sink function.
+ * @param sinker Processing to be performed on each tuple.
+ */
+ protected void setSinker(Consumer<T> sinker) {
+ this.sinker = sinker;
+ }
+
+ /**
+ * Get the sink function that processes each tuple.
+ * @return function that processes each tuple.
+ */
+ protected Consumer<T> getSinker() {
+ return sinker;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/oplet/src/main/java/org/apache/edgent/oplet/core/Source.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/org/apache/edgent/oplet/core/Source.java b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Source.java
new file mode 100644
index 0000000..eb9f2e8
--- /dev/null
+++ b/api/oplet/src/main/java/org/apache/edgent/oplet/core/Source.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.oplet.core;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.oplet.OpletContext;
+
+public abstract class Source<T> extends AbstractOplet<Void, T> {
+
+ private Consumer<T> destination;
+
+ @Override
+ public void initialize(OpletContext<Void, T> context) {
+ super.initialize(context);
+
+ destination = context.getOutputs().get(0);
+ }
+
+ protected Consumer<T> getDestination() {
+ return destination;
+ }
+
+ /**
+ * Submit a tuple to single output.
+ * @param tuple Tuple to be submitted.
+ */
+ protected void submit(T tuple) {
+ getDestination().accept(tuple);
+ }
+
+ @Override
+ public final List<Consumer<Void>> getInputs() {
+ return Collections.emptyList();
+ }
+}