You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/18 06:59:58 UTC
[1/2] samza git commit: Javadoc cleanup for new Application,
Descriptor, Context and Table APIs.
Repository: samza
Updated Branches:
refs/heads/master a8a8dc78d -> 5f1e75247
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java
index f13352c..dcc15a8 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java
@@ -22,12 +22,14 @@ import org.apache.samza.system.descriptors.OutputDescriptor;
import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.serializers.Serde;
+
/**
- * A descriptor for a kafka output stream.
+ * A {@link KafkaOutputDescriptor} can be used for specifying Samza and Kafka-specific properties of Kafka
+ * output streams.
* <p>
- * An instance of this descriptor may be obtained from an appropriately configured {@link KafkaSystemDescriptor}.
+ * Use {@link KafkaSystemDescriptor#getOutputDescriptor} to obtain an instance of this descriptor.
* <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
index 6fb8c1c..091c21a 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
@@ -34,9 +34,12 @@ import org.apache.samza.system.kafka.KafkaSystemFactory;
/**
- * A descriptor for a Kafka system.
+ * A {@link KafkaSystemDescriptor} can be used for specifying Samza and Kafka-specific properties of a Kafka
+ * input/output system. It can also be used for obtaining {@link KafkaInputDescriptor}s and
+ * {@link KafkaOutputDescriptor}s, which can be used for specifying Samza and system-specific properties of
+ * Kafka input/output streams.
* <p>
- * System properties provided in configuration override corresponding properties configured using a descriptor.
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
*/
@SuppressWarnings("unchecked")
public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescriptor>
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index 8265414..47d6cf0 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -47,10 +47,10 @@ public class SamzaSqlApplication implements StreamApplication {
private AtomicInteger queryId = new AtomicInteger(0);
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
try {
// TODO: Introduce an API to return a dsl string containing one or more sql statements.
- List<String> dslStmts = SamzaSqlDslConverter.fetchSqlFromConfig(appDesc.getConfig());
+ List<String> dslStmts = SamzaSqlDslConverter.fetchSqlFromConfig(appDescriptor.getConfig());
Map<Integer, TranslatorContext> translatorContextMap = new HashMap<>();
@@ -59,21 +59,21 @@ public class SamzaSqlApplication implements StreamApplication {
Set<String> outputSystemStreams = new HashSet<>();
Collection<RelRoot> relRoots =
- SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, appDesc.getConfig(),
+ SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, appDescriptor.getConfig(),
inputSystemStreams, outputSystemStreams);
// 2. Populate configs
SamzaSqlApplicationConfig sqlConfig =
- new SamzaSqlApplicationConfig(appDesc.getConfig(), inputSystemStreams, outputSystemStreams);
+ new SamzaSqlApplicationConfig(appDescriptor.getConfig(), inputSystemStreams, outputSystemStreams);
// 3. Translate Calcite plan to Samza stream operators
- QueryTranslator queryTranslator = new QueryTranslator(appDesc, sqlConfig);
+ QueryTranslator queryTranslator = new QueryTranslator(appDescriptor, sqlConfig);
SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
Map<String, SamzaRelConverter> converters = sqlConfig.getSamzaRelConverters();
for (RelRoot relRoot : relRoots) {
LOG.info("Translating relRoot {} to samza stream graph", relRoot);
int qId = queryId.incrementAndGet();
- TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext, converters);
+ TranslatorContext translatorContext = new TranslatorContext(appDescriptor, relRoot, executionContext, converters);
translatorContextMap.put(qId, translatorContext);
queryTranslator.translate(relRoot, translatorContext, qId);
}
@@ -85,7 +85,7 @@ public class SamzaSqlApplication implements StreamApplication {
* container, so it does not need to be serialized. Therefore, the translatorContext is recreated in each container
* and does not need to be serialized.
*/
- appDesc.withApplicationTaskContextFactory((jobContext,
+ appDescriptor.withApplicationTaskContextFactory((jobContext,
containerContext,
taskContext,
applicationContainerContext) ->
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
index ba9c8b3..766b529 100644
--- a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
@@ -55,7 +55,7 @@ public class AppWithGlobalConfigExample implements StreamApplication {
}
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -65,15 +65,15 @@ public class AppWithGlobalConfigExample implements StreamApplication {
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
- appDesc.getInputStream(inputStreamDescriptor)
+ appDescriptor.getInputStream(inputStreamDescriptor)
.window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1,
null, null)
.setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
.setAccumulationMode(AccumulationMode.DISCARDING), "window1")
.map(m -> KV.of(m.getKey().getKey(), new PageViewCount(m)))
- .sendTo(appDesc.getOutputStream(outputStreamDescriptor));
+ .sendTo(appDescriptor.getOutputStream(outputStreamDescriptor));
- appDesc.withMetricsReporterFactories(new HashMap<>());
+ appDescriptor.withMetricsReporterFactories(new HashMap<>());
}
class PageViewEvent {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
index 7721d44..bf641ce 100644
--- a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
@@ -50,7 +50,7 @@ public class BroadcastExample implements StreamApplication {
}
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<KV<String, PageViewEvent>> pageViewEvent =
@@ -62,10 +62,10 @@ public class BroadcastExample implements StreamApplication {
KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream3 =
trackingSystem.getOutputDescriptor("outStream3", serde);
- MessageStream<KV<String, PageViewEvent>> inputStream = appDesc.getInputStream(pageViewEvent);
- inputStream.filter(m -> m.key.equals("key1")).sendTo(appDesc.getOutputStream(outStream1));
- inputStream.filter(m -> m.key.equals("key2")).sendTo(appDesc.getOutputStream(outStream2));
- inputStream.filter(m -> m.key.equals("key3")).sendTo(appDesc.getOutputStream(outStream3));
+ MessageStream<KV<String, PageViewEvent>> inputStream = appDescriptor.getInputStream(pageViewEvent);
+ inputStream.filter(m -> m.key.equals("key1")).sendTo(appDescriptor.getOutputStream(outStream1));
+ inputStream.filter(m -> m.key.equals("key2")).sendTo(appDescriptor.getOutputStream(outStream2));
+ inputStream.filter(m -> m.key.equals("key3")).sendTo(appDescriptor.getOutputStream(outStream3));
}
class PageViewEvent {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
index 4923b7d..444039a 100644
--- a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -58,7 +58,7 @@ public class KeyValueStoreExample implements StreamApplication {
}
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -68,9 +68,9 @@ public class KeyValueStoreExample implements StreamApplication {
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class)));
- appDesc.withDefaultSystem(trackingSystem);
- MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
- OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = appDesc.getOutputStream(outputStreamDescriptor);
+ appDescriptor.withDefaultSystem(trackingSystem);
+ MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
+ OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = appDescriptor.getOutputStream(outputStreamDescriptor);
pageViewEvents
.partitionBy(pve -> pve.memberId, pve -> pve,
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
index ac0db36..e3eee23 100644
--- a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
@@ -49,7 +49,7 @@ public class MergeExample implements StreamApplication {
}
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
@@ -64,8 +64,8 @@ public class MergeExample implements StreamApplication {
trackingSystem.getOutputDescriptor("mergedStream", serde);
MessageStream
- .mergeAll(ImmutableList.of(appDesc.getInputStream(isd1), appDesc.getInputStream(isd2), appDesc.getInputStream(isd3)))
- .sendTo(appDesc.getOutputStream(osd));
+ .mergeAll(ImmutableList.of(appDescriptor.getInputStream(isd1), appDescriptor.getInputStream(isd2), appDescriptor.getInputStream(isd3)))
+ .sendTo(appDescriptor.getOutputStream(osd));
}
class PageViewEvent {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
index ea38984..54cced1 100644
--- a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -50,7 +50,7 @@ public class OrderShipmentJoinExample implements StreamApplication {
}
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<OrderRecord> orderStreamDescriptor =
@@ -61,12 +61,12 @@ public class OrderShipmentJoinExample implements StreamApplication {
trackingSystem.getOutputDescriptor("fulfilledOrders",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
- appDesc.getInputStream(orderStreamDescriptor)
- .join(appDesc.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(),
+ appDescriptor.getInputStream(orderStreamDescriptor)
+ .join(appDescriptor.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(),
new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
Duration.ofMinutes(1), "join")
.map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
- .sendTo(appDesc.getOutputStream(fulfilledOrdersStreamDescriptor));
+ .sendTo(appDescriptor.getOutputStream(fulfilledOrdersStreamDescriptor));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
index 1476c81..5fe7b9c 100644
--- a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
@@ -58,7 +58,7 @@ public class PageViewCounterExample implements StreamApplication {
}
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -68,8 +68,8 @@ public class PageViewCounterExample implements StreamApplication {
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
- MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
- OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = appDesc.getOutputStream(outputStreamDescriptor);
+ MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
+ OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = appDescriptor.getOutputStream(outputStreamDescriptor);
SupplierFunction<Integer> initialValue = () -> 0;
FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
index 2cf3ac3..19403b0 100644
--- a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
@@ -54,7 +54,7 @@ public class RepartitionExample implements StreamApplication {
}
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -64,9 +64,9 @@ public class RepartitionExample implements StreamApplication {
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class)));
- appDesc.withDefaultSystem(trackingSystem);
- MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
- OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = appDesc.getOutputStream(outputStreamDescriptor);
+ appDescriptor.withDefaultSystem(trackingSystem);
+ MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
+ OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = appDescriptor.getOutputStream(outputStreamDescriptor);
pageViewEvents
.partitionBy(pve -> pve.memberId, pve -> pve,
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
index 8f6c6f8..44528e6 100644
--- a/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
@@ -60,18 +60,18 @@ public class TaskApplicationExample implements TaskApplication {
}
@Override
- public void describe(TaskApplicationDescriptor appDesc) {
+ public void describe(TaskApplicationDescriptor appDescriptor) {
// add input and output streams
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<String> isd = ksd.getInputDescriptor("myinput", new StringSerde());
KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor("myout", new StringSerde());
TableDescriptor td = new RocksDbTableDescriptor("mytable");
- appDesc.addInputStream(isd);
- appDesc.addOutputStream(osd);
- appDesc.addTable(td);
+ appDescriptor.addInputStream(isd);
+ appDescriptor.addOutputStream(osd);
+ appDescriptor.addTable(td);
// create the task factory based on configuration
- appDesc.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+ appDescriptor.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
index 51089f7..426fd8d 100644
--- a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
@@ -57,7 +57,7 @@ public class WindowExample implements StreamApplication {
}
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -67,8 +67,8 @@ public class WindowExample implements StreamApplication {
SupplierFunction<Integer> initialValue = () -> 0;
FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
- MessageStream<PageViewEvent> inputStream = appDesc.getInputStream(inputStreamDescriptor);
- OutputStream<Integer> outputStream = appDesc.getOutputStream(outputStreamDescriptor);
+ MessageStream<PageViewEvent> inputStream = appDescriptor.getInputStream(inputStreamDescriptor);
+ OutputStream<Integer> outputStream = appDescriptor.getOutputStream(outputStreamDescriptor);
// create a tumbling window that outputs the number of message collected every 10 minutes.
// also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
index 2e51f6a..2002ce6 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
@@ -37,9 +37,9 @@ public class TestStandaloneIntegrationApplication implements StreamApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(TestStandaloneIntegrationApplication.class);
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
String systemName = "testSystemName";
- String inputStreamName = appDesc.getConfig().get("input.stream.name");
+ String inputStreamName = appDescriptor.getConfig().get("input.stream.name");
String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, outputStreamName);
KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(systemName);
@@ -49,6 +49,6 @@ public class TestStandaloneIntegrationApplication implements StreamApplication {
kafkaSystemDescriptor.getInputDescriptor(inputStreamName, noOpSerde);
KafkaOutputDescriptor<KV<Object, Object>> osd =
kafkaSystemDescriptor.getOutputDescriptor(inputStreamName, noOpSerde);
- appDesc.getInputStream(isd).sendTo(appDesc.getOutputStream(osd));
+ appDescriptor.getInputStream(isd).sendTo(appDescriptor.getOutputStream(osd));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index 672837b..6f381e2 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -96,11 +96,11 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
class PipelineApplication implements StreamApplication {
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<KV<String, PageView>> isd =
sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
- appDesc.getInputStream(isd)
+ appDescriptor.getInputStream(isd)
.map(KV::getValue)
.partitionBy(pv -> pv.getMemberId(), pv -> pv, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
.sink((m, collector, coordinator) -> {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 8431f57..74c32b4 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -151,11 +151,11 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
class TestStreamApp implements StreamApplication {
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<KV<String, PageView>> isd =
sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
- appDesc.getInputStream(isd)
+ appDescriptor.getInputStream(isd)
.map(KV::getValue)
.partitionBy(pv -> pv.getMemberId(), pv -> pv, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
.sink((m, collector, coordinator) -> {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
index ef17a22..28d790e 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
@@ -35,14 +35,14 @@ public class BroadcastAssertApp implements StreamApplication {
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
- Config config = appDesc.getConfig();
+ public void describe(StreamApplicationDescriptor appDescriptor) {
+ Config config = appDescriptor.getConfig();
String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, serde);
- final MessageStream<PageView> broadcastPageViews = appDesc
+ final MessageStream<PageView> broadcastPageViews = appDescriptor
.getInputStream(isd)
.broadcast(serde, "pv");
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
index 649c032..eca62d0 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
@@ -92,15 +92,15 @@ public class FaultInjectionTest extends StreamApplicationIntegrationTestHarness
private static transient CountDownLatch containerShutdownLatch;
@Override
- public void describe(TaskApplicationDescriptor appDesc) {
- Config config = appDesc.getConfig();
+ public void describe(TaskApplicationDescriptor appDescriptor) {
+ Config config = appDescriptor.getConfig();
String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, serde);
- appDesc.addInputStream(isd);
- appDesc.setTaskFactory((StreamTaskFactory) () -> new FaultInjectionTask(containerShutdownLatch));
+ appDescriptor.addInputStream(isd);
+ appDescriptor.setTaskFactory((StreamTaskFactory) () -> new FaultInjectionTask(containerShutdownLatch));
}
private static class FaultInjectionTask implements StreamTask, ClosableTask {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index a442140..476c0dc 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -129,20 +129,20 @@ public class StreamApplicationIntegrationTest {
private static class PageViewProfileViewJoinApplication implements StreamApplication {
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
- Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable(
+ public void describe(StreamApplicationDescriptor appDescriptor) {
+ Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(
new RocksDbTableDescriptor<Integer, TestTableData.Profile>("profile-view-store",
KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
KafkaInputDescriptor<TestTableData.Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
- appDesc.getInputStream(profileISD).map(m -> new KV(m.getMemberId(), m)).sendTo(table);
+ appDescriptor.getInputStream(profileISD).map(m -> new KV(m.getMemberId(), m)).sendTo(table);
KafkaInputDescriptor<TestTableData.PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD =
ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
- OutputStream<TestTableData.EnrichedPageView> outputStream = appDesc.getOutputStream(enrichedPageViewOSD);
- appDesc.getInputStream(pageViewISD)
+ OutputStream<TestTableData.EnrichedPageView> outputStream = appDescriptor.getOutputStream(enrichedPageViewOSD);
+ appDescriptor.getInputStream(pageViewISD)
.partitionBy(TestTableData.PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(
TestTableData.PageView.class)), "p1")
.join(table, new PageViewToProfileJoinFunction())
@@ -152,22 +152,22 @@ public class StreamApplicationIntegrationTest {
private static class PageViewFilterApplication implements StreamApplication {
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
KafkaInputDescriptor<KV<String, PageView>> isd =
ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
- MessageStream<KV<String, TestData.PageView>> inputStream = appDesc.getInputStream(isd);
+ MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
inputStream.map(KV::getValue).filter(pv -> pv.getPageKey().equals("inbox"));
}
}
private static class PageViewRepartitionApplication implements StreamApplication {
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
KafkaInputDescriptor<KV<String, PageView>> isd =
ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
- MessageStream<KV<String, TestData.PageView>> inputStream = appDesc.getInputStream(isd);
+ MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
inputStream
.map(KV::getValue)
.partitionBy(PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(PageView.class)), "p1")
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index aa9e107..184bb12 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -207,18 +207,18 @@ public class StreamTaskIntegrationTest {
static public class JoinTaskApplication implements TaskApplication {
@Override
- public void describe(TaskApplicationDescriptor appDesc) {
- appDesc.setTaskFactory((StreamTaskFactory) () -> new StatefulStreamTask());
- appDesc.addTable(new InMemoryTableDescriptor("profile-view-store",
+ public void describe(TaskApplicationDescriptor appDescriptor) {
+ appDescriptor.setTaskFactory((StreamTaskFactory) () -> new StatefulStreamTask());
+ appDescriptor.addTable(new InMemoryTableDescriptor("profile-view-store",
KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
KafkaInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
- appDesc.addInputStream(profileISD);
+ appDescriptor.addInputStream(profileISD);
KafkaInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
- appDesc.addInputStream(pageViewISD);
+ appDescriptor.addInputStream(pageViewISD);
KafkaOutputDescriptor<EnrichedPageView> enrichedPageViewOSD =
ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
- appDesc.addOutputStream(enrichedPageViewOSD);
+ appDescriptor.addOutputStream(enrichedPageViewOSD);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
index 20f18ee..b6d3ed8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
@@ -39,11 +39,11 @@ public class TestSchedulingApp implements StreamApplication {
public static final String PAGE_VIEWS = "page-views";
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka");
KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, serde);
- final MessageStream<PageView> pageViews = appDesc.getInputStream(isd);
+ final MessageStream<PageView> pageViews = appDescriptor.getInputStream(isd);
final MessageStream<PageView> output = pageViews.flatMap(new FlatmapScheduledFn());
MessageStreamAssert.that("Output from scheduling function should container all complete messages", output, serde)
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index dda31ea..24726f8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -55,10 +55,10 @@ public class RepartitionJoinWindowApp implements StreamApplication {
private final List<String> intermediateStreamIds = new ArrayList<>();
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
// offset.default = oldest required for tests since checkpoint topic is empty on start and messages are published
// before the application is run
- Config config = appDesc.getConfig();
+ Config config = appDescriptor.getConfig();
String inputTopic1 = config.get(INPUT_TOPIC_1_CONFIG_KEY);
String inputTopic2 = config.get(INPUT_TOPIC_2_CONFIG_KEY);
String outputTopic = config.get(OUTPUT_TOPIC_CONFIG_KEY);
@@ -66,8 +66,8 @@ public class RepartitionJoinWindowApp implements StreamApplication {
KafkaInputDescriptor<PageView> id1 = ksd.getInputDescriptor(inputTopic1, new JsonSerdeV2<>(PageView.class));
KafkaInputDescriptor<AdClick> id2 = ksd.getInputDescriptor(inputTopic2, new JsonSerdeV2<>(AdClick.class));
- MessageStream<PageView> pageViews = appDesc.getInputStream(id1);
- MessageStream<AdClick> adClicks = appDesc.getInputStream(id2);
+ MessageStream<PageView> pageViews = appDescriptor.getInputStream(id1);
+ MessageStream<AdClick> adClicks = appDescriptor.getInputStream(id2);
MessageStream<KV<String, PageView>> pageViewsRepartitionedByViewId = pageViews
.partitionBy(PageView::getViewId, pv -> pv,
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
index fdf0761..fe8e318 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
@@ -47,19 +47,19 @@ public class RepartitionWindowApp implements StreamApplication {
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KVSerde<String, PageView> inputSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class));
KVSerde<String, String> outputSerde = KVSerde.of(new StringSerde(), new StringSerde());
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<KV<String, PageView>> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
KafkaOutputDescriptor<KV<String, String>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
- appDesc.getInputStream(id)
+ appDescriptor.getInputStream(id)
.map(KV::getValue)
.partitionBy(PageView::getUserId, m -> m, inputSerde, "p1")
.window(Windows.keyedSessionWindow(m -> m.getKey(), Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new IntegerSerde()), "w1")
.map(wp -> KV.of(wp.getKey().getKey().toString(), String.valueOf(wp.getMessage())))
- .sendTo(appDesc.getOutputStream(od));
+ .sendTo(appDescriptor.getOutputStream(od));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 6dd4303..508e3dc 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -57,15 +57,15 @@ public class SessionWindowApp implements StreamApplication {
}
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde());
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
- MessageStream<PageView> pageViews = appDesc.getInputStream(id);
- OutputStream<KV<String, Integer>> outputStream = appDesc.getOutputStream(od);
+ MessageStream<PageView> pageViews = appDescriptor.getInputStream(id);
+ OutputStream<KV<String, Integer>> outputStream = appDescriptor.getOutputStream(od);
pageViews
.filter(m -> !FILTER_KEY.equals(m.getUserId()))
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index 4b87169..d1bd44f 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -59,15 +59,15 @@ public class TumblingWindowApp implements StreamApplication {
}
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde());
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
- MessageStream<PageView> pageViews = appDesc.getInputStream(id);
- OutputStream<KV<String, Integer>> outputStream = appDesc.getOutputStream(od);
+ MessageStream<PageView> pageViews = appDescriptor.getInputStream(id);
+ OutputStream<KV<String, Integer>> outputStream = appDescriptor.getOutputStream(od);
pageViews
.filter(m -> !FILTER_KEY.equals(m.getUserId()))
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
index 0991fa1..a2170cc 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
@@ -59,12 +59,12 @@ public class TestStreamApplication implements StreamApplication {
}
@Override
- public void describe(StreamApplicationDescriptor streamAppDesc) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(systemName);
KafkaInputDescriptor<String> isd = ksd.getInputDescriptor(inputTopic, new NoOpSerde<>());
KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor(outputTopic, new StringSerde());
- MessageStream<String> inputStream = streamAppDesc.getInputStream(isd);
- OutputStream<String> outputStream = streamAppDesc.getOutputStream(osd);
+ MessageStream<String> inputStream = appDescriptor.getInputStream(isd);
+ OutputStream<String> outputStream = appDescriptor.getOutputStream(osd);
inputStream.map(new TestMapFunction(appName, processorName)).sendTo(outputStream);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index a9741b4..5a977e2 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -416,12 +416,12 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
static public class MyTaskApplication implements TaskApplication {
@Override
- public void describe(TaskApplicationDescriptor appDesc) {
- appDesc.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
- appDesc.addTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())));
+ public void describe(TaskApplicationDescriptor appDescriptor) {
+ appDescriptor.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+ appDescriptor.addTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())));
DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
- appDesc.addInputStream(pageViewISD);
+ appDescriptor.addInputStream(pageViewISD);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 5852de5..4e410d9 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -130,14 +130,14 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
static final String PROFILE_TABLE = "profile-table";
@Override
- public void describe(StreamApplicationDescriptor appDesc) {
- Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable(getTableDescriptor());
+ public void describe(StreamApplicationDescriptor appDescriptor) {
+ Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(getTableDescriptor());
KafkaSystemDescriptor sd =
new KafkaSystemDescriptor("test");
- appDesc.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
+ appDescriptor.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
.partitionBy(TestTableData.PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "partition-page-view")
.join(table, new PageViewToProfileJoinFunction())
- .sendTo(appDesc.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())));
+ .sendTo(appDescriptor.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())));
}
protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
[2/2] samza git commit: Javadoc cleanup for new Application,
Descriptor, Context and Table APIs.
Posted by ja...@apache.org.
Javadoc cleanup for new Application, Descriptor, Context and Table APIs.
Author: Prateek Maheshwari <pm...@apache.org>
Reviewers: Cameron Lee<ca...@linkedin.com>
Closes #737 from prateekm/javadoc-cleanup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5f1e7524
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5f1e7524
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5f1e7524
Branch: refs/heads/master
Commit: 5f1e75247db7decf933b508d77ce76abdbabbdf4
Parents: a8a8dc7
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Wed Oct 17 23:58:16 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Oct 17 23:58:16 2018 -0700
----------------------------------------------------------------------
.../samza/application/SamzaApplication.java | 24 +++++--
.../samza/application/StreamApplication.java | 67 ++++++++++---------
.../samza/application/TaskApplication.java | 69 ++++++++------------
.../descriptors/ApplicationDescriptor.java | 61 ++++++++++-------
.../StreamApplicationDescriptor.java | 11 +++-
.../descriptors/TaskApplicationDescriptor.java | 22 +++++--
.../context/ApplicationContainerContext.java | 36 ++++++----
.../ApplicationContainerContextFactory.java | 25 +++----
.../samza/context/ApplicationTaskContext.java | 32 +++++----
.../context/ApplicationTaskContextFactory.java | 30 ++++-----
.../apache/samza/context/ContainerContext.java | 21 +++---
.../java/org/apache/samza/context/Context.java | 56 ++++++++--------
.../org/apache/samza/context/JobContext.java | 20 +++---
.../org/apache/samza/context/TaskContext.java | 53 +++++++++------
.../descriptors/GenericInputDescriptor.java | 14 ++--
.../descriptors/GenericOutputDescriptor.java | 14 ++--
.../descriptors/GenericSystemDescriptor.java | 13 ++--
.../system/descriptors/InputDescriptor.java | 8 ++-
.../system/descriptors/OutputDescriptor.java | 8 ++-
.../system/descriptors/StreamDescriptor.java | 9 ++-
.../system/descriptors/SystemDescriptor.java | 9 ++-
.../main/java/org/apache/samza/table/Table.java | 21 +++++-
.../table/descriptors/TableDescriptor.java | 37 +++++++----
.../java/org/apache/samza/task/TaskFactory.java | 4 +-
.../descriptors/EventHubsInputDescriptor.java | 7 +-
.../descriptors/EventHubsOutputDescriptor.java | 8 ++-
.../descriptors/EventHubsSystemDescriptor.java | 7 +-
.../application/LegacyTaskApplication.java | 4 +-
.../application/MockStreamApplication.java | 2 +-
.../samza/application/TestApplicationUtil.java | 2 +-
.../samza/execution/TestExecutionPlanner.java | 2 +-
.../kafka/descriptors/KafkaInputDescriptor.java | 8 ++-
.../descriptors/KafkaOutputDescriptor.java | 8 ++-
.../descriptors/KafkaSystemDescriptor.java | 7 +-
.../samza/sql/runner/SamzaSqlApplication.java | 14 ++--
.../example/AppWithGlobalConfigExample.java | 8 +--
.../apache/samza/example/BroadcastExample.java | 10 +--
.../samza/example/KeyValueStoreExample.java | 8 +--
.../org/apache/samza/example/MergeExample.java | 6 +-
.../samza/example/OrderShipmentJoinExample.java | 8 +--
.../samza/example/PageViewCounterExample.java | 6 +-
.../samza/example/RepartitionExample.java | 8 +--
.../samza/example/TaskApplicationExample.java | 10 +--
.../org/apache/samza/example/WindowExample.java | 6 +-
.../TestStandaloneIntegrationApplication.java | 6 +-
.../EndOfStreamIntegrationTest.java | 4 +-
.../WatermarkIntegrationTest.java | 4 +-
.../test/framework/BroadcastAssertApp.java | 6 +-
.../test/framework/FaultInjectionTest.java | 8 +--
.../StreamApplicationIntegrationTest.java | 18 ++---
.../framework/StreamTaskIntegrationTest.java | 12 ++--
.../samza/test/framework/TestSchedulingApp.java | 4 +-
.../test/operator/RepartitionJoinWindowApp.java | 8 +--
.../test/operator/RepartitionWindowApp.java | 6 +-
.../samza/test/operator/SessionWindowApp.java | 6 +-
.../samza/test/operator/TumblingWindowApp.java | 6 +-
.../test/processor/TestStreamApplication.java | 6 +-
.../apache/samza/test/table/TestLocalTable.java | 8 +--
.../table/TestLocalTableWithSideInputs.java | 8 +--
59 files changed, 518 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java b/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
index 5423e2e..849b2b3 100644
--- a/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
+++ b/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
@@ -23,19 +23,29 @@ import org.apache.samza.application.descriptors.ApplicationDescriptor;
/**
- * The base interface for all user-implemented applications in Samza.
+ * A {@link SamzaApplication} describes the inputs, outputs, state, configuration and the logic
+ * for processing data from one or more streaming sources.
* <p>
- * The main processing logic of the user application should be implemented in {@link SamzaApplication#describe(ApplicationDescriptor)}
- * method. Sub-classes {@link StreamApplication} and {@link TaskApplication} are specific interfaces for applications
- * written in high-level DAG and low-level task APIs, respectively.
+ * This is the base {@link SamzaApplication}. Implement a {@link StreamApplication} to describe the
+ * processing logic using Samza's High Level API in terms of {@link org.apache.samza.operators.MessageStream}
+ * operators, or a {@link TaskApplication} to describe it using Samza's Low Level API in terms of per-message
+ * processing logic.
+ * <p>
+ * A {@link SamzaApplication} implementation must have a no-argument constructor, which will be used by the framework
+ * to create new instances and call {@link #describe(ApplicationDescriptor)}.
+ * <p>
+ * Per container context may be managed using {@link org.apache.samza.context.ApplicationContainerContext} and
+ * set using {@link ApplicationDescriptor#withApplicationContainerContextFactory}. Similarly, per task context
+ * may be managed using {@link org.apache.samza.context.ApplicationTaskContext} and set using
+ * {@link ApplicationDescriptor#withApplicationTaskContextFactory}.
*/
@InterfaceStability.Evolving
public interface SamzaApplication<S extends ApplicationDescriptor> {
/**
- * Describes the user processing logic via {@link ApplicationDescriptor}
+ * Describes the inputs, outputs, state, configuration and processing logic using the provided {@code appDescriptor}.
*
- * @param appDesc the {@link ApplicationDescriptor} object to describe user application logic
+ * @param appDescriptor the {@link ApplicationDescriptor} to use for describing the application.
*/
- void describe(S appDesc);
+ void describe(S appDescriptor);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
index fe77045..3749b58 100644
--- a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
+++ b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
@@ -21,22 +21,38 @@ package org.apache.samza.application;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+
/**
- * Describes and initializes the transforms for processing message streams and generating results in high-level API.
+ * A {@link StreamApplication} describes the inputs, outputs, state, configuration and the processing logic
+ * in Samza's High Level API.
+ * <p>
+ * A typical {@link StreamApplication} implementation consists of the following stages:
+ * <ol>
+ * <li>Configuring the inputs, outputs and state (tables) using the appropriate
+ * {@link org.apache.samza.system.descriptors.SystemDescriptor}s,
+ * {@link org.apache.samza.system.descriptors.InputDescriptor}s,
+ * {@link org.apache.samza.system.descriptors.OutputDescriptor}s and
+ * {@link org.apache.samza.table.descriptors.TableDescriptor}s
+ * <li>Obtaining the corresponding
+ * {@link org.apache.samza.operators.MessageStream}s,
+ * {@link org.apache.samza.operators.OutputStream}s and
+ * {@link org.apache.samza.table.Table}s from the provided {@link StreamApplicationDescriptor}.
+ * <li>Defining the processing logic using operators and functions on the streams and tables thus obtained.
+ * E.g., {@link org.apache.samza.operators.MessageStream#filter(org.apache.samza.operators.functions.FilterFunction)}
+ * </ol>
* <p>
- * The following example removes page views older than 1 hour from the input stream:
+ * The following example {@link StreamApplication} removes page views older than 1 hour from the input stream:
* <pre>{@code
* public class PageViewFilter implements StreamApplication {
- * public void describe(StreamAppDescriptor appDesc) {
- * KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
+ * public void describe(StreamApplicationDescriptor appDescriptor) {
+ * KafkaSystemDescriptor trackingSystemDescriptor = new KafkaSystemDescriptor("tracking");
* KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
- * trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
- *
+ * trackingSystemDescriptor.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
* KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
- * trackingSystem.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
+ * trackingSystemDescriptor.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
*
- * MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
- * OutputStream<PageViewEvent> recentPageViewEvents = appDesc.getOutputStream(outputStreamDescriptor);
+ * MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
+ * OutputStream<PageViewEvent> recentPageViewEvents = appDescriptor.getOutputStream(outputStreamDescriptor);
*
* pageViewEvents
* .filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis())
@@ -44,33 +60,20 @@ import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
* }
* }
* }</pre>
- *<p>
- * The example above can be run using an ApplicationRunner:
- * <pre>{@code
- * public static void main(String[] args) {
- * CommandLine cmdLine = new CommandLine();
- * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * PageViewFilter app = new PageViewFilter();
- * ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
- * runner.run();
- * runner.waitForFinish();
- * }
- * }</pre>
- *
+ * <p>
+ * All operator function implementations used in a {@link StreamApplication} must be {@link java.io.Serializable}. Any
+ * context required within an operator function may be managed by implementing the
+ * {@link org.apache.samza.operators.functions.InitableFunction#init} and
+ * {@link org.apache.samza.operators.functions.ClosableFunction#close} methods in the function implementation.
+ * <p>
+ * Functions may implement the {@link org.apache.samza.operators.functions.ScheduledFunction} interface
+ * to schedule and receive periodic callbacks from the Samza framework.
* <p>
* Implementation Notes: Currently {@link StreamApplication}s are wrapped in a {@link org.apache.samza.task.StreamTask}
* during execution. The execution planner will generate a serialized DAG which will be deserialized in each
* {@link org.apache.samza.task.StreamTask} instance used for processing incoming messages. Execution is synchronous
- * and thread-safe within each {@link org.apache.samza.task.StreamTask}.
- *
- * <p>
- * A {@link StreamApplication} implementation must have a proper fully-qualified class name and a default constructor
- * with no parameters to ensure successful instantiation in both local and remote environments.
- * Functions implemented for transforms in StreamApplications ({@link org.apache.samza.operators.functions.MapFunction},
- * {@link org.apache.samza.operators.functions.FilterFunction} for e.g.) are initable and closable. They are initialized
- * before messages are delivered to them and closed after their execution when the {@link org.apache.samza.task.StreamTask}
- * instance is closed. See {@link org.apache.samza.operators.functions.InitableFunction} and {@link org.apache.samza.operators.functions.ClosableFunction}.
- * Function implementations are required to be {@link java.io.Serializable}.
+ * and thread-safe within each {@link org.apache.samza.task.StreamTask}. Multiple tasks may process their
+ * messages concurrently depending on the job parallelism configuration.
*/
@InterfaceStability.Evolving
public interface StreamApplication extends SamzaApplication<StreamApplicationDescriptor> {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java b/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
index d84aa12..4210393 100644
--- a/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
+++ b/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
@@ -23,64 +23,49 @@ import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
/**
- * Describes and initializes the transforms for processing message streams and generating results in low-level API. Your
- * application is expected to implement this interface.
+ * A {@link TaskApplication} describes the inputs, outputs, state, configuration and the processing logic
+ * in Samza's Low Level API.
+ * A typical {@link TaskApplication} implementation consists of the following stages:
+ * <ol>
+ * <li>Configuring the inputs, outputs and state (tables) using the appropriate
+ * {@link org.apache.samza.system.descriptors.SystemDescriptor}s,
+ * {@link org.apache.samza.system.descriptors.StreamDescriptor}s and
+ * {@link org.apache.samza.table.descriptors.TableDescriptor}s
+ * <li>Adding these descriptors to the provided {@link TaskApplicationDescriptor}.
+ * <li>Defining the processing logic by implementing a {@link org.apache.samza.task.StreamTask} or
+ * {@link org.apache.samza.task.AsyncStreamTask} that operates on each
+ * {@link org.apache.samza.system.IncomingMessageEnvelope} one at a time.
+ * <li>Setting a {@link org.apache.samza.task.TaskFactory} using
+ * {@link TaskApplicationDescriptor#setTaskFactory(org.apache.samza.task.TaskFactory)} that creates instances of the
+ * task above. The {@link org.apache.samza.task.TaskFactory} implementation must be {@link java.io.Serializable}.
+ * </ol>
* <p>
- * The following example removes page views older than 1 hour from the input stream:
+ * The following example {@link TaskApplication} removes page views older than 1 hour from the input stream:
* <pre>{@code
* public class PageViewFilter implements TaskApplication {
- * public void describe(TaskAppDescriptor appDesc) {
- * KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor(PageViewTask.SYSTEM);
+ * public void describe(TaskApplicationDescriptor appDescriptor) {
+ * KafkaSystemDescriptor trackingSystemDescriptor = new KafkaSystemDescriptor("tracking");
* KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
- * trackingSystem.getInputDescriptor(PageViewTask.TASK_INPUT, new JsonSerdeV2<>(PageViewEvent.class));
- *
+ * trackingSystemDescriptor.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
* KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
- * trackingSystem.getOutputDescriptor(PageViewTask.TASK_OUTPUT, new JsonSerdeV2<>(PageViewEvent.class)));
+ * trackingSystemDescriptor.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
*
- * appDesc.addInputStream(inputStreamDescriptor);
- * appDesc.addOutputStream(outputStreamDescriptor);
- * appDesc.setTaskFactory((StreamTaskFactory) () -> new PageViewTask());
+ * appDescriptor.addInputStream(inputStreamDescriptor);
+ * appDescriptor.addOutputStream(outputStreamDescriptor);
+ * appDescriptor.setTaskFactory((StreamTaskFactory) () -> new PageViewTask());
* }
* }
*
* public class PageViewTask implements StreamTask {
- * final static String TASK_INPUT = "pageViewEvents";
- * final static String TASK_OUTPUT = "recentPageViewEvents";
- * final static String SYSTEM = "kafka";
- *
- * public void process(IncomingMessageEnvelope message, MessageCollector collector,
- * TaskCoordinator coordinator) {
+ * public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
* PageViewEvent m = (PageViewEvent) message.getValue();
* if (m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis()) {
- * collector.send(new OutgoingMessageEnvelope(new SystemStream(SYSTEM, TASK_OUTPUT),
- * message.getKey(), message.getKey(), m));
+ * collector.send(new OutgoingMessageEnvelope(
+ * new SystemStream("tracking", "recentPageViewEvent"), message.getKey(), message.getKey(), m));
* }
* }
* }
* }</pre>
- *
- *<p>
- * The example above can be run using an ApplicationRunner:
- * <pre>{@code
- * public static void main(String[] args) {
- * CommandLine cmdLine = new CommandLine();
- * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * PageViewFilter app = new PageViewFilter();
- * ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
- * runner.run();
- * runner.waitForFinish();
- * }
- * }</pre>
- *
- * <p>
- * Implementation Notes: {@link TaskApplication} allow users to instantiate {@link org.apache.samza.task.StreamTask} or
- * {@link org.apache.samza.task.AsyncStreamTask} when describing the processing logic. A new {@link TaskApplicationDescriptor }
- * instance will be created and described by the user-defined {@link TaskApplication} when planning the execution.
- * {@link org.apache.samza.task.TaskFactory} is required to be serializable.
- *
- * <p>
- * The user-implemented {@link TaskApplication} class must be a class with proper fully-qualified class name and
- * a default constructor with no parameters to ensure successful instantiation in both local and remote environments.
*/
@InterfaceStability.Evolving
public interface TaskApplication extends SamzaApplication<TaskApplicationDescriptor> {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
index b2d54ca..6a4c9fd 100644
--- a/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
@@ -29,32 +29,42 @@ import org.apache.samza.system.descriptors.SystemDescriptor;
/**
- * The interface class to describe the configuration, input and output streams, and processing logic in a
+ * An {@link ApplicationDescriptor} contains the description of inputs, outputs, state, configuration and the
+ * processing logic for a {@link org.apache.samza.application.SamzaApplication}.
+ * <p>
+ * This is the base {@link ApplicationDescriptor} and provides functionality common to all
* {@link org.apache.samza.application.SamzaApplication}.
+ * {@link org.apache.samza.application.StreamApplication#describe} will provide access to a
+ * {@link StreamApplicationDescriptor} with additional functionality for describing High Level API applications.
+ * Similarly, {@link org.apache.samza.application.TaskApplication#describe} will provide access to a
+ * {@link TaskApplicationDescriptor} with additional functionality for describing Low Level API applications.
* <p>
- * Sub-classes {@link StreamApplicationDescriptor} and {@link TaskApplicationDescriptor} are specific interfaces for
- * applications written in high-level {@link org.apache.samza.application.StreamApplication} and low-level
- * {@link org.apache.samza.application.TaskApplication} APIs, respectively.
- *
- * @param <S> sub-class of user application descriptor.
+ * Use the {@link ApplicationDescriptor} to set the container scope context factory using
+ * {@link ApplicationDescriptor#withApplicationContainerContextFactory}, and task scope context factory using
+ * {@link ApplicationDescriptor#withApplicationTaskContextFactory}. Please note that the terms {@code container}
+ * and {@code task} here refer to the units of physical and logical parallelism, not the programming API.
*/
@InterfaceStability.Evolving
public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
/**
- * Get the {@link Config} of the application
- * @return config of the application
+ * Get the configuration for the application.
+ * @return config for the application
*/
Config getConfig();
/**
- * Sets the default SystemDescriptor to use for the application. This is equivalent to setting
- * {@code job.default.system} and its properties in configuration.
+ * Sets the {@link SystemDescriptor} for the default system for the application.
+ * <p>
+ * The default system is used by the framework for creating any internal (e.g., coordinator, changelog, checkpoint)
+ * streams. In an {@link org.apache.samza.application.StreamApplication}, it is also used for creating any
+ * intermediate streams; e.g., those created by the {@link org.apache.samza.operators.MessageStream#partitionBy} and
+ * {@link org.apache.samza.operators.MessageStream#broadcast} operators.
* <p>
* If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
*
- * @param defaultSystemDescriptor the default system descriptor to use
- * @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
+ * @param defaultSystemDescriptor the {@link SystemDescriptor} for the default system for the application
+ * @return this {@link ApplicationDescriptor}
*/
S withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
@@ -64,10 +74,11 @@ public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
* context can be accessed through the {@link org.apache.samza.context.Context}.
* <p>
* Setting this is optional.
+ * <p>
+ * The provided {@code factory} instance must be {@link java.io.Serializable}.
*
* @param factory the {@link ApplicationContainerContextFactory} for this application
- * @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
- * {@link ApplicationContainerContextFactory}
+ * @return this {@link ApplicationDescriptor}
*/
S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory);
@@ -77,31 +88,37 @@ public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
* accessed through the {@link org.apache.samza.context.Context}.
* <p>
* Setting this is optional.
+ * <p>
+ * The provided {@code factory} instance must be {@link java.io.Serializable}.
*
* @param factory the {@link ApplicationTaskContextFactory} for this application
- * @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
- * {@link ApplicationTaskContextFactory}
+ * @return this {@link ApplicationDescriptor}
*/
S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory);
/**
* Sets the {@link ProcessorLifecycleListenerFactory} for this application.
- *
- * <p>Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to
+ * <p>
+ * Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to
* plug in optional code to be invoked in different stages before/after the main processing logic is started/stopped in
* the application.
+ * <p>
+ * The provided {@code factory} instance must be {@link java.io.Serializable}.
*
* @param listenerFactory the user implemented {@link ProcessorLifecycleListenerFactory} that creates lifecycle listener
* with callback methods before and after the start/stop of each StreamProcessor in the application
- * @return type {@code S} of {@link ApplicationDescriptor} with {@code listenerFactory} set as its {@link ProcessorLifecycleListenerFactory}
+ * @return this {@link ApplicationDescriptor}
*/
S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory);
/**
- * Sets a set of customized {@link MetricsReporterFactory}s in the application
+ * Sets the {@link org.apache.samza.metrics.MetricsReporterFactory}s for creating the
+ * {@link org.apache.samza.metrics.MetricsReporter}s to use for the application.
+ * <p>
+ * The provided {@link MetricsReporterFactory} instances must be {@link java.io.Serializable}.
*
- * @param reporterFactories the map of customized {@link MetricsReporterFactory}s to be used
- * @return type {@code S} of {@link ApplicationDescriptor} with {@code reporterFactories}
+ * @param reporterFactories a map of {@link org.apache.samza.metrics.MetricsReporter} names to their factories.
+ * @return this {@link ApplicationDescriptor}
*/
S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories);
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
index 3a35054..4a77c6c 100644
--- a/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
@@ -29,7 +29,16 @@ import org.apache.samza.table.Table;
/**
- * The interface class to describe a {@link org.apache.samza.application.SamzaApplication} in high-level API in Samza.
+ * A {@link StreamApplicationDescriptor} contains the description of inputs, outputs, state, configuration and the
+ * processing logic for a Samza High Level API {@link org.apache.samza.application.StreamApplication}.
+ * <p>
+ * Use the {@link StreamApplicationDescriptor} obtained from
+ * {@link org.apache.samza.application.StreamApplication#describe} to get the {@link MessageStream}s,
+ * {@link OutputStream}s and {@link Table}s corresponding to their respective {@link InputDescriptor}s,
+ * {@link OutputDescriptor}s and {@link TableDescriptor}s.
+ * <p>
+ * Use the {@link MessageStream} API operators to describe the processing logic for the
+ * {@link org.apache.samza.application.StreamApplication}.
*/
@InterfaceStability.Evolving
public interface StreamApplicationDescriptor extends ApplicationDescriptor<StreamApplicationDescriptor> {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
index 4730297..b395a06 100644
--- a/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
@@ -26,18 +26,28 @@ import org.apache.samza.task.TaskFactory;
/**
- * The interface to describe a {@link org.apache.samza.application.SamzaApplication} that uses low-level API task
- * for processing.
+ * A {@link TaskApplicationDescriptor} contains the description of inputs, outputs, state, configuration and the
+ * processing logic for a Samza Low Level API {@link org.apache.samza.application.TaskApplication}.
+ * <p>
+ * Use the {@link TaskApplicationDescriptor} obtained from {@link org.apache.samza.application.TaskApplication#describe}
+ * to add the {@link InputDescriptor}s, {@link OutputDescriptor}s and {@link TableDescriptor}s for streams and
+ * tables to be used in the task implementation.
+ * <p>
+ * Use {@link #setTaskFactory} to set the factory for the {@link org.apache.samza.task.StreamTask} or
+ * {@link org.apache.samza.task.AsyncStreamTask} implementation that contains the processing logic for
+ * the {@link org.apache.samza.application.TaskApplication}.
*/
@InterfaceStability.Evolving
public interface TaskApplicationDescriptor extends ApplicationDescriptor<TaskApplicationDescriptor> {
/**
- * Sets the {@link TaskFactory} for the user application. The {@link TaskFactory#createInstance()} creates task instance
- * that implements the main processing logic of the user application.
+ * Sets the {@link org.apache.samza.task.StreamTaskFactory} or {@link org.apache.samza.task.AsyncStreamTaskFactory}
+ * for the {@link org.apache.samza.task.StreamTask} or {@link org.apache.samza.task.AsyncStreamTask} implementation
+ * that contains the processing logic for the {@link org.apache.samza.application.TaskApplication}.
+ * <p>
+ * The provided {@code taskFactory} instance must be serializable.
*
- * @param factory the {@link TaskFactory} including the low-level task processing logic. The only allowed task factory
- * classes are {@link org.apache.samza.task.StreamTaskFactory} and {@link org.apache.samza.task.AsyncStreamTaskFactory}.
+ * @param factory the {@link TaskFactory} for the Low Level API Task implementation
*/
void setTaskFactory(TaskFactory factory);
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java
index aab8c7f..8ac34a5 100644
--- a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java
@@ -18,34 +18,42 @@
*/
package org.apache.samza.context;
+
/**
- * An application should implement this to contain any runtime objects required by processing logic which can be shared
- * across all tasks in a container. A single instance of this will be created in each container. Note that if the
- * container moves or the container model changes (e.g. container failure/rebalancing), then this will be recreated.
+ * An {@link ApplicationContainerContext} instance can be used for holding per-container runtime state and objects
+ * and managing their lifecycle. This context is shared across all tasks in the container.
+ * <p>
+ * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationContainerContextFactory}
+ * to provide the {@link ApplicationContainerContextFactory}. Use {@link Context#getApplicationContainerContext()} to
+ * get the created {@link ApplicationContainerContext} instance for the current container.
* <p>
- * This needs to be created by an implementation of {@link ApplicationContainerContextFactory}. The factory should
- * create the runtime objects contained within this context.
+ * A unique instance of {@link ApplicationContainerContext} is created in each container. If the
+ * container moves or the container model changes (e.g. due to failure or re-balancing), a new instance is created.
* <p>
- * This is related to {@link ContainerContext} in that they are both associated with the container lifecycle. In order
- * to access this in application code, use {@link Context#getApplicationContainerContext()}. The
- * {@link ContainerContext} is accessible through {@link Context#getContainerContext()}.
+ * Use the {@link ApplicationContainerContextFactory} to create any runtime state and objects, and the
+ * {@link ApplicationContainerContext#start()} and {@link ApplicationContainerContext#stop()} methods to
+ * manage their lifecycle.
* <p>
- * If it is necessary to have a separate instance per task, then use {@link ApplicationTaskContext} instead.
+ * Use {@link ApplicationTaskContext} to hold unique runtime state and objects for each task within a container.
+ * Use {@link ContainerContext} to access framework-provided context for a container.
* <p>
- * This class does not need to be {@link java.io.Serializable} and instances are not persisted across deployments.
+ * Unlike its {@link ApplicationContainerContextFactory}, an implementation does not need to be
+ * {@link java.io.Serializable}.
*/
public interface ApplicationContainerContext {
/**
- * Lifecycle logic which will run after tasks in the container are initialized but before processing begins.
+ * Starts this {@link ApplicationContainerContext} before any tasks in the container are initialized and before
+ * processing begins.
* <p>
- * If this throws an exception, then the container will fail to start.
+ * If this throws an exception, the container will fail to start.
*/
void start();
/**
- * Lifecycle logic which will run after processing ends but before tasks in the container are closed.
+ * Stops this {@link ApplicationContainerContext} after processing ends and after all tasks in the container
+ * are closed.
* <p>
- * If this throws an exception, then the container will fail to fully shut down.
+ * If this throws an exception, the container will fail to fully shut down.
*/
void stop();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
index 074b0b4..a8c9f7c 100644
--- a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
@@ -22,24 +22,25 @@ import java.io.Serializable;
/**
- * An application should implement this if it has a {@link ApplicationContainerContext} that is needed for
- * initialization.
+ * The factory for creating {@link ApplicationContainerContext} instances for a
+ * {@link org.apache.samza.application.SamzaApplication} during container initialization.
* <p>
- * This will be called to create an instance of {@link ApplicationContainerContext} during the container initialization
- * stage. At that stage, the framework-provided job-level and container-level contexts are available for creating the
- * {@link ApplicationContainerContext}.
+ * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationContainerContextFactory} to
+ * provide the {@link ApplicationContainerContextFactory}. Use {@link Context#getApplicationContainerContext()} to
+ * get the created {@link ApplicationContainerContext} instance for the current container.
* <p>
- * This is {@link Serializable} because it is specified in the
- * {@link org.apache.samza.application.descriptors.ApplicationDescriptor}.
- * @param <T> concrete type of {@link ApplicationContainerContext} returned by this factory
+ * The {@link ApplicationContainerContextFactory} implementation must be {@link Serializable}.
+ *
+ * @param <T> concrete type of {@link ApplicationContainerContext} created by this factory
*/
public interface ApplicationContainerContextFactory<T extends ApplicationContainerContext> extends Serializable {
+
/**
- * Create an instance of the application-defined {@link ApplicationContainerContext}.
+ * Creates an instance of the application-defined {@link ApplicationContainerContext}.
*
- * @param jobContext framework-provided job context used for building {@link ApplicationContainerContext}
- * @param containerContext framework-provided container context used for building {@link ApplicationContainerContext}
- * @return new instance of the application-defined {@link ApplicationContainerContext}
+ * @param jobContext framework-provided job context
+ * @param containerContext framework-provided container context
+ * @return a new instance of the application-defined {@link ApplicationContainerContext}
*/
T create(JobContext jobContext, ContainerContext containerContext);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java
index 6afbf23..a4236bf 100644
--- a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java
@@ -18,34 +18,38 @@
*/
package org.apache.samza.context;
+
/**
- * An application should implement this to contain any runtime objects required by processing logic which cannot be
- * shared across tasks. A new instance of this will be created for each task.
+ * An {@link ApplicationTaskContext} instance can be used for holding per-task runtime state and objects and managing
+ * their lifecycle in an {@link org.apache.samza.application.SamzaApplication}
* <p>
- * This needs to be created by an implementation of {@link ApplicationTaskContextFactory}. The factory should create
- * the runtime objects contained within this context.
+ * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationTaskContextFactory}
+ * to provide the {@link ApplicationTaskContextFactory}. Use {@link Context#getApplicationTaskContext()} to get
+ * the created {@link ApplicationTaskContext} instance for the current task.
* <p>
- * This is related to {@link TaskContext} in that they are both associated with a task lifecycle. In order to access
- * this in application code, use {@link Context#getApplicationTaskContext()}. The {@link TaskContext} is accessible
- * through {@link Context#getTaskContext()}.
+ * A unique instance of {@link ApplicationTaskContext} is created for each task in a container.
+ * Use the {@link ApplicationTaskContextFactory} to create any runtime state and objects, and the
+ * {@link ApplicationTaskContext#start()} and {@link ApplicationTaskContext#stop()} methods to manage their lifecycle.
* <p>
- * If it is possible to share an instance of this across tasks in a container, then use
- * {@link ApplicationContainerContext} instead.
+ * Use {@link ApplicationContainerContext} to hold runtime state and objects shared across all tasks within a container.
+ * Use {@link TaskContext} to access framework-provided context for a task.
* <p>
- * This class does not need to be {@link java.io.Serializable} and instances are not persisted across deployments.
+ * Unlike its {@link ApplicationTaskContextFactory}, an implementation does not need to be
+ * {@link java.io.Serializable}.
*/
public interface ApplicationTaskContext {
+
/**
- * Lifecycle logic which will run after tasks are initialized but before processing begins.
+ * Starts this {@link ApplicationTaskContext} after its task is initialized but before any messages are processed.
* <p>
- * If this throws an exception, then the container will fail to start.
+ * If this throws an exception, the container will fail to start.
*/
void start();
/**
- * Lifecycle logic which will run after processing ends but before tasks are closed.
+ * Stops this {@link ApplicationTaskContext} after processing ends but before its task is closed.
* <p>
- * If this throws an exception, then the container will fail to fully shut down.
+ * If this throws an exception, the container will fail to fully shut down.
*/
void stop();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
index 619bbc7..c00935f 100644
--- a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
@@ -22,27 +22,27 @@ import java.io.Serializable;
/**
- * An application should implement this if it has a {@link ApplicationTaskContext} that is needed for
- * initialization. This will be used to create instance(s) of that {@link ApplicationTaskContext}.
+ * The factory for creating {@link ApplicationTaskContext} instances for a
+ * {@link org.apache.samza.application.SamzaApplication}during task initialization.
* <p>
- * This will be called to create an instance of {@link ApplicationTaskContext} during the initialization stage of each
- * task. At that stage, the framework-provided job-level, container-level, and task-level contexts are available for
- * creating the {@link ApplicationTaskContext}. Also, the application-defined container-level context is available.
+ * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationTaskContextFactory} to
+ * provide the {@link ApplicationTaskContextFactory}. Use {@link Context#getApplicationTaskContext()} to
+ * get the created {@link ApplicationTaskContext} instance for the current task.
* <p>
- * This is {@link Serializable} because it is specified in the
- * {@link org.apache.samza.application.descriptors.ApplicationDescriptor}.
- * @param <T> concrete type of {@link ApplicationTaskContext} returned by this factory
+ * The {@link ApplicationTaskContextFactory} implementation must be {@link Serializable}.
+ *
+ * @param <T> concrete type of {@link ApplicationTaskContext} created by this factory
*/
public interface ApplicationTaskContextFactory<T extends ApplicationTaskContext> extends Serializable {
+
/**
- * Create an instance of the application-defined {@link ApplicationTaskContext}.
+ * Creates an instance of the application-defined {@link ApplicationTaskContext}.
*
- * @param jobContext framework-provided job context used for building {@link ApplicationTaskContext}
- * @param containerContext framework-provided container context used for building {@link ApplicationTaskContext}
- * @param taskContext framework-provided task context used for building {@link ApplicationTaskContext}
- * @param applicationContainerContext application-provided container context used for building
- * {@link ApplicationTaskContext}
- * @return new instance of the application-defined {@link ApplicationContainerContext}
+ * @param jobContext framework-provided job context
+ * @param containerContext framework-provided container context
+ * @param taskContext framework-provided task context
+ * @param applicationContainerContext application-defined container context
+ * @return a new instance of the application-defined {@link ApplicationTaskContext}
*/
T create(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext,
ApplicationContainerContext applicationContainerContext);
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/context/ContainerContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/ContainerContext.java b/samza-api/src/main/java/org/apache/samza/context/ContainerContext.java
index 51d7918..97c5f53 100644
--- a/samza-api/src/main/java/org/apache/samza/context/ContainerContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/ContainerContext.java
@@ -23,24 +23,25 @@ import org.apache.samza.metrics.MetricsRegistry;
/**
- * Contains information at container granularity, provided by the Samza framework, to be used to instantiate an
- * application at runtime.
+ * The framework-provided context for the current container.
* <p>
- * Note that application-defined container-level context is accessible through
- * {@link ApplicationContainerContext}.
+ * Use {@link ApplicationContainerContext} for the application-defined context for the current container.
*/
public interface ContainerContext {
+
/**
- * Returns the {@link ContainerModel} associated with this container. This contains information like the id and the
- * associated {@link org.apache.samza.job.model.TaskModel}s.
- * @return {@link ContainerModel} associated with this container
+ * Gets the {@link ContainerModel} for this container, which contains this container's id and
+ * its {@link org.apache.samza.job.model.TaskModel}.
+ *
+ * @return the {@link ContainerModel} for this container
*/
ContainerModel getContainerModel();
/**
- * Returns the {@link MetricsRegistry} for this container. Metrics built using this registry will be associated with
- * the container.
- * @return {@link MetricsRegistry} for this container
+ * Gets the {@link MetricsRegistry} for this container, which can be used to register metrics that are
+ * reported per container.
+ *
+ * @return the {@link MetricsRegistry} for this container
*/
MetricsRegistry getContainerMetricsRegistry();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/context/Context.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/Context.java b/samza-api/src/main/java/org/apache/samza/context/Context.java
index bfe66d3..e111127 100644
--- a/samza-api/src/main/java/org/apache/samza/context/Context.java
+++ b/samza-api/src/main/java/org/apache/samza/context/Context.java
@@ -19,59 +19,59 @@
package org.apache.samza.context;
/**
- * Container object for all context provided to instantiate an application at runtime.
+ * A holder for all framework and application defined contexts at runtime.
*/
public interface Context {
/**
- * Returns the framework-provided context for the overall job that is being run.
- * @return framework-provided job context
+ * Gets the framework-provided context for the job.
+ *
+ * @return the framework-provided job context
*/
JobContext getJobContext();
/**
- * Returns the framework-provided context for the container that this is in.
+ * Gets the framework-provided context for the current container. This context is shared by all tasks within
+ * the container.
* <p>
- * Note that this is not the application-defined container context. Use
- * {@link Context#getApplicationContainerContext()} to get the application-defined container context.
- * @return framework-provided container context
+ * Use {@link #getApplicationContainerContext()} to get the application-defined container context.
+ *
+ * @return the framework-provided container context
*/
ContainerContext getContainerContext();
/**
- * Returns the framework-provided context for the task that that this is in.
+ * Gets the framework-provided context for the current task.
* <p>
- * Note that this is not the application-defined task context. Use {@link Context#getApplicationTaskContext()}
- * to get the application-defined task context.
- * @return framework-provided task context
+ * Use {@link #getApplicationTaskContext()} to get the application-defined task context.
+ *
+ * @return the framework-provided task context
*/
TaskContext getTaskContext();
/**
- * Returns the application-defined container context object specified by the
- * {@link ApplicationContainerContextFactory}. This is shared across all tasks in the container, but not across
- * containers.
+ * Gets the application-defined context for the current container. This context is shared by all tasks within
+ * the container.
* <p>
- * In order to use this in application code, it should be casted to the concrete type that corresponds to the
- * {@link ApplicationContainerContextFactory}.
+ * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationContainerContextFactory}
+ * to provide a factory for this context. Cast the returned context to the concrete implementation type to use it.
* <p>
- * Note that this is not the framework-provided container context. Use {@link Context#getContainerContext()} to get
- * the framework-provided container context.
- * @return application-defined container context
- * @throws IllegalStateException if no context could be built (e.g. no factory provided)
+ * Use {@link #getContainerContext()} to get the framework-provided container context.
+ *
+ * @return the application-defined container context
+ * @throws IllegalStateException if no {@link ApplicationContainerContextFactory} was was provided for the application
*/
ApplicationContainerContext getApplicationContainerContext();
/**
- * Returns the application-defined task context object specified by the {@link ApplicationTaskContextFactory}.
- * Each task will have a separate instance of this.
+ * Gets the application-defined task context for the current task. This context is unique to this task.
* <p>
- * In order to use this in application code, it should be casted to the concrete type that corresponds to the
- * {@link ApplicationTaskContextFactory}.
+ * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationTaskContextFactory}
+ * to provide a factory for this context. Cast the returned context to the concrete implementation type to use it.
* <p>
- * Note that this is not the framework-provided task context. Use {@link Context#getTaskContext()} to get the
- * framework-provided task context.
- * @return application-defined task context
- * @throws IllegalStateException if no context could be built (e.g. no factory provided)
+ * Use {@link Context#getTaskContext()} to get the framework-provided task context.
+ *
+ * @return the application-defined task context
+ * @throws IllegalStateException if no {@link ApplicationTaskContextFactory} was provided for the application
*/
ApplicationTaskContext getApplicationTaskContext();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/context/JobContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/JobContext.java b/samza-api/src/main/java/org/apache/samza/context/JobContext.java
index 239a011..8e41980 100644
--- a/samza-api/src/main/java/org/apache/samza/context/JobContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/JobContext.java
@@ -22,26 +22,28 @@ import org.apache.samza.config.Config;
/**
- * Contains information at job granularity, provided by the Samza framework, to be used to instantiate an application at
- * runtime.
+ * The framework-provided context for the job.
*/
public interface JobContext {
+
/**
- * Returns the final configuration for this job.
- * @return configuration for this job
+ * Gets the final configuration for this job.
+ *
+ * @return the configuration for this job
*/
Config getConfig();
/**
- * Returns the name of the job.
- * @return name of the job
- * @throws org.apache.samza.SamzaException if the job name was not configured
+ * Gets the name of the job.
+ *
+ * @return the name of this job
*/
String getJobName();
/**
- * Returns the instance id for this instance of this job.
- * @return instance id for the job
+ * Gets the id for this job.
+ *
+ * @return the id for this job
*/
String getJobId();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/TaskContext.java b/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
index d29f6a5..3a5333c 100644
--- a/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
@@ -28,54 +28,65 @@ import org.apache.samza.table.Table;
/**
- * Contains information at task granularity, provided by the Samza framework, to be used to instantiate an application
- * at runtime.
+ * The framework-provided context for the current task.
* <p>
- * Note that application-defined task-level context is accessible through {@link ApplicationTaskContext}.
+ * Use {@link ApplicationTaskContext} for the application-defined context for the current task.
*/
public interface TaskContext {
+
/**
- * Returns the {@link TaskModel} associated with this task. This contains information like the task name and
- * associated {@link SystemStreamPartition}s.
- * @return {@link TaskModel} associated with this task
+ * Gets the {@link TaskModel} for this task, which contains this task's name and its {@link SystemStreamPartition}s.
+ *
+ * @return the {@link TaskModel} for this task
*/
TaskModel getTaskModel();
/**
- * Returns the {@link MetricsRegistry} for this task. Metrics built using this registry will be associated with the
- * task.
- * @return {@link MetricsRegistry} for this task
+ * Gets the {@link MetricsRegistry} for this task, which can be used to register metrics that are reported per task.
+ *
+ * @return the {@link MetricsRegistry} for this task
*/
MetricsRegistry getTaskMetricsRegistry();
/**
- * Returns the {@link KeyValueStore} corresponding to the {@code storeName}. In application code, it is recommended to
- * cast the resulting stores to {@link KeyValueStore}s with the correct concrete type parameters.
- * @param storeName name of the {@link KeyValueStore} to get
- * @return {@link KeyValueStore} corresponding to the {@code storeName}
+ * Gets the {@link KeyValueStore} associated with {@code storeName} for this task.
+ * <p>
+ * The returned store should be cast with the concrete type parameters based on the configured store serdes.
+ * E.g., if using string key and integer value serde, it should be cast to a {@code KeyValueStore<String, Integer>}.
+ *
+ * @param storeName name of the {@link KeyValueStore} to get for this task
+ * @return the {@link KeyValueStore} associated with {@code storeName} for this task
* @throws IllegalArgumentException if there is no store associated with {@code storeName}
*/
KeyValueStore<?, ?> getStore(String storeName);
/**
- * Returns the {@link Table} corresponding to the {@code tableId}. In application code, it is recommended to cast this
- * to the resulting tables to {@link Table}s with the correct concrete type parameters.
+ * Gets the {@link Table} corresponding to the {@code tableId} for this task.
+ *
+ * The returned table should be cast with the concrete type parameters based on the configured table serdes, and
+ * whether it is {@link org.apache.samza.table.ReadWriteTable} or {@link org.apache.samza.table.ReadableTable}.
+ * E.g., if using string key and integer value serde for a writable table, it should be cast to a
+ * {@code ReadWriteTable<String, Integer>}.
+ *
* @param tableId id of the {@link Table} to get
- * @return {@link Table} corresponding to the {@code tableId}
+ * @return the {@link Table} associated with {@code tableId} for this task
* @throws IllegalArgumentException if there is no table associated with {@code tableId}
*/
Table<?> getTable(String tableId);
/**
- * Returns a task-level {@link CallbackScheduler} which can be used to delay execution of some logic.
- * @return {@link CallbackScheduler} for this task
+ * Gets the {@link CallbackScheduler} for this task, which can be used to schedule a callback to be executed
+ * at a future time.
+ *
+ * @return the {@link CallbackScheduler} for this task
*/
CallbackScheduler getCallbackScheduler();
/**
- * Set the starting offset for the given {@link SystemStreamPartition}. Offsets can only be set for a
- * {@link SystemStreamPartition} assigned to this task. The {@link SystemStreamPartition}s assigned to this task can
- * be accessed through {@link TaskModel#getSystemStreamPartitions()} for the {@link TaskModel} obtained by calling
+ * Sets the starting offset for the given {@link SystemStreamPartition}.
+ * <p> Offsets can only be set for a {@link SystemStreamPartition} assigned to this task.
+ * The {@link SystemStreamPartition}s assigned to this task can be accessed through
+ * {@link TaskModel#getSystemStreamPartitions()} for the {@link TaskModel} obtained by calling
* {@link #getTaskModel()}. Trying to set the offset for any other partition will have no effect.
*
* NOTE: this feature is experimental, and the API may change in a future release.
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
index aa5c8d2..e08fa09 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
@@ -21,16 +21,16 @@ package org.apache.samza.system.descriptors;
import org.apache.samza.serializers.Serde;
/**
- * A descriptor for a generic input stream.
+ * A {@link GenericInputDescriptor} can be used for specifying Samza and system-specific properties of
+ * input streams.
* <p>
- * An instance of this descriptor may be obtained from an appropriately configured
- * {@link GenericSystemDescriptor}.
+ * If the system provides its own system and stream descriptor implementations, use them instead.
+ * Otherwise, use this {@link GenericInputDescriptor} to specify Samza-specific properties of the stream,
+ * and {@link #withStreamConfigs} to specify additional system specific properties.
* <p>
- * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
- * Otherwise, this {@link GenericInputDescriptor} may be used to provide Samza-specific properties of the input stream.
- * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
+ * Use {@link GenericSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor.
* <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
index 1d81525..7302c35 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
@@ -21,16 +21,16 @@ package org.apache.samza.system.descriptors;
import org.apache.samza.serializers.Serde;
/**
- * A descriptor for a generic output stream.
+ * A {@link GenericOutputDescriptor} can be used for specifying Samza and system-specific properties of
+ * output streams.
* <p>
- * An instance of this descriptor may be obtained from an appropriately configured
- * {@link GenericSystemDescriptor}.
+ * If the system provides its own system and stream descriptor implementations, use them instead.
+ * Otherwise, use this {@link GenericOutputDescriptor} to specify Samza-specific properties of the stream,
+ * and {@link #withStreamConfigs} to specify additional system specific properties.
* <p>
- * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
- * Otherwise, this {@link GenericOutputDescriptor} may be used to provide Samza-specific properties of the output stream.
- * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
+ * Use {@link GenericSystemDescriptor#getOutputDescriptor} to obtain an instance of this descriptor.
* <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
index eb86877..4884cd6 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
@@ -22,13 +22,16 @@ package org.apache.samza.system.descriptors;
import org.apache.samza.serializers.Serde;
/**
- * A descriptor for a generic system.
+ * A {@link GenericSystemDescriptor} can be used for specifying Samza and system-specific properties of an
+ * input/output system. It can also be used for obtaining {@link GenericInputDescriptor}s and
+ * {@link GenericOutputDescriptor}s, which can be used for specifying any Samza and system-specific properties
+ * of input/output streams.
* <p>
- * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
- * Otherwise, this {@link GenericSystemDescriptor} may be used to provide Samza-specific properties of the system.
- * Additional system specific properties may be provided using {@link #withSystemConfigs}
+ * If the system provides its own system and stream descriptor implementations, use them instead.
+ * Otherwise, use this {@link GenericSystemDescriptor} to specify Samza-specific properties of the system,
+ * and {@link #withSystemConfigs} to specify additional system specific properties.
* <p>
- * System properties provided in configuration override corresponding properties configured using a descriptor.
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
*/
public final class GenericSystemDescriptor extends SystemDescriptor<GenericSystemDescriptor>
implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
index fd7a50c..2c6f88b 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
@@ -26,9 +26,13 @@ import org.apache.samza.serializers.Serde;
import org.apache.samza.system.SystemStreamMetadata.OffsetType;
/**
- * The base descriptor for an input stream. Allows setting properties that are common to all input streams.
+ * An {@link InputDescriptor} can be used for specifying Samza and system-specific properties of input streams.
* <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
+ * <p>
+ * This is the base descriptor for an input stream. Use a system-specific input descriptor (e.g. KafkaInputDescriptor)
+ * obtained from its system descriptor (e.g. KafkaSystemDescriptor) if one is available. Otherwise use the
+ * {@link GenericInputDescriptor} obtained from a {@link GenericSystemDescriptor}.
*
* @param <StreamMessageType> type of messages in this stream.
* @param <SubClass> type of the concrete sub-class
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
index 898be1e..264c6da 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
@@ -21,9 +21,13 @@ package org.apache.samza.system.descriptors;
import org.apache.samza.serializers.Serde;
/**
- * The base descriptor for an output stream. Allows setting properties that are common to all output streams.
+ * An {@link OutputDescriptor} can be used for specifying Samza and system-specific properties of output streams.
* <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
+ * <p>
+ * This is the base descriptor for an output stream. Use a system-specific input descriptor (e.g. KafkaOutputDescriptor)
+ * obtained from its system descriptor (e.g. KafkaSystemDescriptor) if one is available. Otherwise use the
+ * {@link GenericOutputDescriptor} obtained from a {@link GenericSystemDescriptor}.
*
* @param <StreamMessageType> type of messages in this stream.
* @param <SubClass> type of the concrete sub-class
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
index e8e586f..43cab8f 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
@@ -29,9 +29,14 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.samza.serializers.Serde;
/**
- * The base descriptor for an input or output stream. Allows setting properties that are common to all streams.
+ * A {@link StreamDescriptor} can be used for specifying Samza and system-specific properties of input/output streams.
* <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptors.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptors.
+ * <p>
+ * This is the base descriptor for an input/output stream. Use a system-specific input/output descriptor
+ * (e.g. KafkaInputDescriptor) obtained from its system descriptor (e.g. KafkaSystemDescriptor) if one is available.
+ * Otherwise use the {@link GenericInputDescriptor} and {@link GenericOutputDescriptor} obtained from a
+ * {@link GenericSystemDescriptor}.
*
* @param <StreamMessageType> type of messages in this stream.
* @param <SubClass> type of the concrete sub-class
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
index 9db2544..813deb1 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
@@ -30,9 +30,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The base descriptor for a system. Allows setting properties that are common to all systems.
+ * A {@link SystemDescriptor} can be used for specifying Samza and system-specific properties of an input/output system.
+ * It can also be used for obtaining {@link InputDescriptor}s and {@link OutputDescriptor}s, which can be used for
+ * specifying Samza and system-specific properties of input/output streams.
* <p>
- * System properties provided in configuration override corresponding properties configured using a descriptor.
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
+ * <p>
+ * This is the base descriptor for a system. Use a system-specific descriptor (e.g. KafkaSystemDescriptor) if one
+ * is available. Otherwise use the {@link GenericSystemDescriptor}.
* <p>
* Systems may provide an {@link InputTransformer} to be used for input streams on the system. An
* {@link InputTransformer} transforms an {@code IncomingMessageEnvelope} with deserialized key and message
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/table/Table.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/Table.java b/samza-api/src/main/java/org/apache/samza/table/Table.java
index 767e176..76ad460 100644
--- a/samza-api/src/main/java/org/apache/samza/table/Table.java
+++ b/samza-api/src/main/java/org/apache/samza/table/Table.java
@@ -20,9 +20,28 @@ package org.apache.samza.table;
import org.apache.samza.annotation.InterfaceStability;
+
/**
*
- * Marker interface for a table.
+ * A {@link Table} is an abstraction for data sources that support random access by key. It is an
+ * evolution of the existing {@link org.apache.samza.storage.kv.KeyValueStore} API. It offers support for
+ * both local and remote data sources and composition through hybrid tables. For remote data sources,
+ * a {@code RemoteTable} provides optimized access with caching, rate-limiting, and retry support.
+ * <p>
+ * Depending on the implementation, a {@link Table} can be a {@link ReadableTable} or a {@link ReadWriteTable}.
+ * <p>
+ * Use a {@link org.apache.samza.table.descriptors.TableDescriptor} to specify the properties of a {@link Table}.
+ * For High Level API {@link org.apache.samza.application.StreamApplication}s, use
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor#getTable} to obtain
+ * the {@link org.apache.samza.table.Table} instance for the descriptor that can be used with the
+ * {@link org.apache.samza.operators.MessageStream} operators like
+ * {@link org.apache.samza.operators.MessageStream#sendTo(Table)}.
+ * Alternatively, use {@link org.apache.samza.context.TaskContext#getTable(String)} in
+ * {@link org.apache.samza.operators.functions.InitableFunction#init} to get the table instance for use within
+ * operator functions.
+ * For Low Level API {@link org.apache.samza.application.TaskApplication}s, use
+ * {@link org.apache.samza.context.TaskContext#getTable(String)} in
+ * {@link org.apache.samza.task.InitableTask#init} to get the table instance for use within the Task.
*
* @param <R> the type of records in the table
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
index 5d7b89e..f1118eb 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
@@ -20,25 +20,33 @@ package org.apache.samza.table.descriptors;
import org.apache.samza.annotation.InterfaceStability;
+
/**
- * User facing class to collect metadata that fully describes a
- * Samza table. This interface should be implemented by concrete table implementations.
+ * A {@link TableDescriptor} can be used for specifying Samza and implementation-specific properties of a
+ * {@link org.apache.samza.table.Table}.
* <p>
- * Typical user code should look like the following, notice <code>withConfig()</code>
- * is defined in this class and the rest in subclasses.
- *
- * <pre>
- * {@code
- * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl",
+ * Table properties provided in configuration override corresponding properties specified using a descriptor.
+ * <p>
+ * This is the base descriptor for a table. Use a implementation-specific descriptor (e.g. RocksDBTableDescriptor) to
+ * use it in the application. For example:
+ * <pre>{@code
+ * RocksDbTableDescriptor tableDescriptor = new RocksDbTableDescriptor("table",
* KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
* .withBlockSize(1024)
* .withConfig("some-key", "some-value");
* }
* </pre>
-
- * Once constructed, a table descriptor can be registered with the system. Internally,
- * the table descriptor is then converted to a {@link org.apache.samza.table.TableSpec},
- * which is used to track tables internally.
+ * For High Level API {@link org.apache.samza.application.StreamApplication}s, use
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor#getTable(TableDescriptor)} to obtain
+ * the corresponding {@link org.apache.samza.table.Table} instance that can be used with the
+ * {@link org.apache.samza.operators.MessageStream} operators like
+ * {@link org.apache.samza.operators.MessageStream#sendTo(org.apache.samza.table.Table)}.
+ * Alternatively, use {@link org.apache.samza.context.TaskContext#getTable(String)} in
+ * {@link org.apache.samza.operators.functions.InitableFunction#init} to get the table instance for use within
+ * operator functions.
+ * For Low Level API {@link org.apache.samza.application.TaskApplication}s, use
+ * {@link org.apache.samza.context.TaskContext#getTable(String)} in
+ * {@link org.apache.samza.task.InitableTask#init} to get the table instance for use within the Task.
*
* @param <K> the type of the key in this table
* @param <V> the type of the value in this table
@@ -48,13 +56,14 @@ import org.apache.samza.annotation.InterfaceStability;
public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
/**
- * Get the Id of the table
- * @return Id of the table
+ * Get the id of the table
+ * @return id of the table
*/
String getTableId();
/**
* Add a configuration entry for the table
+ *
* @param key the key
* @param value the value
* @return this table descriptor instance
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java b/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java
index 8443d20..f9349bd 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java
@@ -23,9 +23,9 @@ import org.apache.samza.annotation.InterfaceStability;
/**
- * The interface for all task factories (i.e. {@link StreamTaskFactory} and {@link AsyncStreamTaskFactory}
+ * The base interface for all task factories (i.e. {@link StreamTaskFactory} and {@link AsyncStreamTaskFactory})
*
- * @param <T> the type of task instances
+ * @param <T> the type of task instances created by the factory
*/
@InterfaceStability.Stable
public interface TaskFactory<T> extends Serializable {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
index df22269..c8cc36b 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
@@ -32,11 +32,12 @@ import org.apache.samza.system.eventhub.EventHubConfig;
/**
- * A descriptor for the Event Hubs output stream
+ * A {@link EventHubsInputDescriptor} can be used for specifying Samza and EventHubs-specific properties of EventHubs
+ * input streams.
* <p>
- * An instance of this descriptor may be obtained from an {@link EventHubsSystemDescriptor}
+ * Use {@link EventHubsSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor.
* <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
index 95f7e42..b3e1c59 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
@@ -30,12 +30,14 @@ import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.eventhub.EventHubConfig;
+
/**
- * A descriptor for an Event Hubs output stream
+ * A {@link EventHubsOutputDescriptor} can be used for specifying Samza and EventHubs-specific properties of EventHubs
+ * output streams.
* <p>
- * An instance of this descriptor may be obtained from and {@link EventHubsSystemDescriptor}
+ * Use {@link EventHubsSystemDescriptor#getOutputDescriptor} to obtain an instance of this descriptor.
* <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
index feffd87..2084018 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
@@ -32,9 +32,12 @@ import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.Partitio
/**
- * A descriptor for a Event Hubs system.
+ * A {@link EventHubsSystemDescriptor} can be used for specifying Samza and EventHubs-specific properties of a EventHubs
+ * input/output system. It can also be used for obtaining {@link EventHubsInputDescriptor}s and
+ * {@link EventHubsOutputDescriptor}s, which can be used for specifying Samza and system-specific properties of
+ * EventHubs input/output streams.
* <p>
- * System properties provided in configuration override corresponding properties configured using a descriptor.
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
*/
public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemDescriptor> {
private static final String FACTORY_CLASS_NAME = EventHubSystemFactory.class.getName();
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
index 2b29a2b..e9cc302 100644
--- a/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
+++ b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
@@ -32,7 +32,7 @@ public final class LegacyTaskApplication implements TaskApplication {
}
@Override
- public void describe(TaskApplicationDescriptor appDesc) {
- appDesc.setTaskFactory(TaskFactoryUtil.getTaskFactory(taskClassName));
+ public void describe(TaskApplicationDescriptor appDescriptor) {
+ appDescriptor.setTaskFactory(TaskFactoryUtil.getTaskFactory(taskClassName));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java b/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
index 8b96c8a..6baa54e 100644
--- a/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
+++ b/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
@@ -25,7 +25,7 @@ import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
*/
public class MockStreamApplication implements StreamApplication {
@Override
- public void describe(StreamApplicationDescriptor appSpec) {
+ public void describe(StreamApplicationDescriptor appDescriptor) {
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
index ab91cee..f620ece 100644
--- a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
@@ -91,7 +91,7 @@ public class TestApplicationUtil {
*/
public static class MockTaskApplication implements TaskApplication {
@Override
- public void describe(TaskApplicationDescriptor appSpec) {
+ public void describe(TaskApplicationDescriptor appDescriptor) {
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 6d017cb..299d631 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -818,7 +818,7 @@ public class TestExecutionPlanner {
public static class MockTaskApplication implements SamzaApplication {
@Override
- public void describe(ApplicationDescriptor appDesc) {
+ public void describe(ApplicationDescriptor appDescriptor) {
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f1e7524/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
index fb279ab..d9477e5 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
@@ -27,12 +27,14 @@ import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.system.descriptors.InputTransformer;
import org.apache.samza.serializers.Serde;
+
/**
- * A descriptor for a kafka input stream.
+ * A {@link KafkaInputDescriptor} can be used for specifying Samza and Kafka-specific properties of Kafka
+ * input streams.
* <p>
- * An instance of this descriptor may be obtained from an appropriately configured {@link KafkaSystemDescriptor}.
+ * Use {@link KafkaSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor.
* <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream.
*/