You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/10/23 01:37:49 UTC
[beam] branch master updated: [BEAM-11078] Add splittable DoFn
documentation to programming guide (#13160)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a99c81c [BEAM-11078] Add splittable DoFn documentation to programming guide (#13160)
a99c81c is described below
commit a99c81c15c03ed642b9ed90f4a0eea14e1b316b9
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Thu Oct 22 18:37:11 2020 -0700
[BEAM-11078] Add splittable DoFn documentation to programming guide (#13160)
* [BEAM-11078, BEAM-2081] Convert to markdown, no changes of content from http://doc/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE
* Fill in placeholders for missing py snippets.
Make website renderable.
* edit heading
* Update section menu
* Fill in Python code snippets, make minor Java/Go code snippet changes
* Fix lint issues in snippets
* Fix Java snippets
* fix whitespace lint issues in website
* More python snippet lint fixes
* Fix Java snippets spotless error
* Add image to SDF high level overview
* Remove current status link to be re-added back once capability matrix is updated
* Update Java snippets to satisfy spotbugs
* Address PR comments
* Replace 'a SDF' with 'an SDF'
---
.../apache/beam/examples/snippets/Snippets.java | 245 ++++++++++++
.../apache_beam/examples/snippets/snippets.py | 207 ++++++++++
.../content/en/documentation/programming-guide.md | 278 ++++++++++++++
.../partials/section-menu/en/documentation.html | 14 +
.../site/static/images/sdf_high_level_overview.svg | 415 +++++++++++++++++++++
5 files changed, 1159 insertions(+)
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index cb55475f..ea8cce2 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -29,6 +29,9 @@ import com.google.cloud.language.v1.Entity;
import com.google.cloud.language.v1.Sentence;
import com.google.cloud.language.v1.Token;
import com.google.gson.Gson;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -39,6 +42,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
@@ -59,6 +63,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -70,6 +75,7 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
@@ -80,6 +86,10 @@ import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.TruncateResult;
+import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -1158,4 +1168,239 @@ public class Snippets {
}
}
}
+
+ private static class BundleFinalization {
+ private static class BundleFinalizationDoFn extends DoFn<String, Integer> {
+ // [START BundleFinalize]
+ @ProcessElement
+ public void processElement(ProcessContext c, BundleFinalizer bundleFinalizer) {
+ // ... produce output ...
+
+ bundleFinalizer.afterBundleCommit(
+ Instant.now().plus(Duration.standardMinutes(5)),
+ () -> {
+ // ... perform a side effect ...
+ });
+ }
+ // [END BundleFinalize]
+ }
+ }
+
+ private static class SplittableDoFn {
+
+ private static void seekToNextRecordBoundaryInFile(
+ RandomAccessFile file, long initialPosition) {}
+
+ private static Integer readNextRecord(RandomAccessFile file) {
+ // ... read a record ...
+ return null;
+ }
+
+ // [START SDF_BasicExample]
+ @BoundedPerElement
+ private static class FileToWordsFn extends DoFn<String, Integer> {
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(@Element String fileName) throws IOException {
+ return new OffsetRange(0, new File(fileName).length());
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element String fileName,
+ RestrictionTracker<OffsetRange, Long> tracker,
+ OutputReceiver<Integer> outputReceiver)
+ throws IOException {
+ RandomAccessFile file = new RandomAccessFile(fileName, "r");
+ seekToNextRecordBoundaryInFile(file, tracker.currentRestriction().getFrom());
+ while (tracker.tryClaim(file.getFilePointer())) {
+ outputReceiver.output(readNextRecord(file));
+ }
+ }
+
+ // Providing the coder is only necessary if it can not be inferred at runtime.
+ @GetRestrictionCoder
+ public Coder<OffsetRange> getRestrictionCoder() {
+ return OffsetRange.Coder.of();
+ }
+ }
+ // [END SDF_BasicExample]
+
+ private static class BasicExampleWithInitialSplitting extends FileToWordsFn {
+ // [START SDF_BasicExampleWithSplitting]
+ void splitRestriction(
+ @Restriction OffsetRange restriction, OutputReceiver<OffsetRange> splitReceiver) {
+ long splitSize = 64 * (1 << 20);
+ long i = restriction.getFrom();
+ while (i < restriction.getTo() - splitSize) {
+ // Compute and output 64 MiB size ranges to process in parallel
+ long end = i + splitSize;
+ splitReceiver.output(new OffsetRange(i, end));
+ i = end;
+ }
+ // Output the last range
+ splitReceiver.output(new OffsetRange(i, restriction.getTo()));
+ }
+ // [END SDF_BasicExampleWithSplitting]
+ }
+
+ private static class BasicExampleWithBadTryClaimLoop extends DoFn<String, Integer> {
+ // [START SDF_BadTryClaimLoop]
+ @ProcessElement
+ public void badTryClaimLoop(
+ @Element String fileName,
+ RestrictionTracker<OffsetRange, Long> tracker,
+ OutputReceiver<Integer> outputReceiver)
+ throws IOException {
+ RandomAccessFile file = new RandomAccessFile(fileName, "r");
+ seekToNextRecordBoundaryInFile(file, tracker.currentRestriction().getFrom());
+ // The restriction tracker can be modified by another thread in parallel
+ // so storing state locally is ill advised.
+ long end = tracker.currentRestriction().getTo();
+ while (file.getFilePointer() < end) {
+ // Only after successfully claiming should we produce any output and/or
+ // perform side effects.
+ tracker.tryClaim(file.getFilePointer());
+ outputReceiver.output(readNextRecord(file));
+ }
+ }
+ // [END SDF_BadTryClaimLoop]
+ }
+
+ private static class CustomWatermarkEstimatorExample extends DoFn<String, Integer> {
+ private static Instant currentWatermark = Instant.now();
+
+ // [START SDF_CustomWatermarkEstimator]
+
+ // (Optional) Define a custom watermark state type to save information between bundle
+ // processing rounds.
+ public static class MyCustomWatermarkState {
+ public MyCustomWatermarkState(String element, OffsetRange restriction) {
+ // Store data necessary for future watermark computations
+ }
+ }
+
+ // (Optional) Choose which coder to use to encode the watermark estimator state.
+ @GetWatermarkEstimatorStateCoder
+ public Coder<MyCustomWatermarkState> getWatermarkEstimatorStateCoder() {
+ return AvroCoder.of(MyCustomWatermarkState.class);
+ }
+
+ // Define a WatermarkEstimator
+ public static class MyCustomWatermarkEstimator
+ implements TimestampObservingWatermarkEstimator<MyCustomWatermarkState> {
+
+ public MyCustomWatermarkEstimator(MyCustomWatermarkState type) {
+ // Initialize watermark estimator state
+ }
+
+ @Override
+ public void observeTimestamp(Instant timestamp) {
+ // Will be invoked on each output from the SDF
+ }
+
+ @Override
+ public Instant currentWatermark() {
+ // Return a monotonically increasing value
+ return currentWatermark;
+ }
+
+ @Override
+ public MyCustomWatermarkState getState() {
+ // Return state to resume future watermark estimation after a checkpoint/split
+ return null;
+ }
+ }
+
+ // Then, update the DoFn to generate the initial watermark estimator state for all new element
+ // and restriction pairs and to create a new instance given watermark estimator state.
+
+ @GetInitialWatermarkEstimatorState
+ public MyCustomWatermarkState getInitialWatermarkEstimatorState(
+ @Element String element, @Restriction OffsetRange restriction) {
+ // Compute and return the initial watermark estimator state for each element and
+ // restriction. All subsequent processing of an element and restriction will be restored
+ // from the existing state.
+ return new MyCustomWatermarkState(element, restriction);
+ }
+
+ @NewWatermarkEstimator
+ public WatermarkEstimator<MyCustomWatermarkState> newWatermarkEstimator(
+ @WatermarkEstimatorState MyCustomWatermarkState oldState) {
+ return new MyCustomWatermarkEstimator(oldState);
+ }
+ }
+ // [END SDF_CustomWatermarkEstimator]
+
+ private static class UserInitiatedCheckpointExample extends DoFn<String, Integer> {
+ public static class ThrottlingException extends Exception {}
+
+ public static class ElementNotReadyException extends Exception {}
+
+ private Service initializeService() {
+ return null;
+ }
+
+ public interface Service {
+ List<Record> readNextRecords(long position) throws ThrottlingException;
+ }
+
+ public interface Record {
+ long getPosition();
+ }
+
+ // [START SDF_UserInitiatedCheckpoint]
+ @ProcessElement
+ public ProcessContinuation processElement(
+ RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Record> outputReceiver) {
+ long currentPosition = tracker.currentRestriction().getFrom();
+ Service service = initializeService();
+ try {
+ while (true) {
+ List<Record> records = service.readNextRecords(currentPosition);
+ if (records.isEmpty()) {
+ // Return a short delay if there is no data to process at the moment.
+ return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(10));
+ }
+ for (Record record : records) {
+ if (!tracker.tryClaim(record.getPosition())) {
+ return ProcessContinuation.stop();
+ }
+ currentPosition = record.getPosition() + 1;
+
+ outputReceiver.output(record);
+ }
+ }
+ } catch (ThrottlingException exception) {
+ // Return a longer delay in case we are being throttled.
+ return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(60));
+ }
+ }
+ // [END SDF_UserInitiatedCheckpoint]
+ }
+
+ private static class TruncateExample extends DoFn<String, Integer> {
+ // [START SDF_Truncate]
+ @TruncateRestriction
+ @Nullable
+ TruncateResult<OffsetRange> truncateRestriction(
+ @Element String fileName, @Restriction OffsetRange restriction) {
+ if (fileName.contains("optional")) {
+ // Skip optional files
+ return null;
+ }
+ return TruncateResult.of(restriction);
+ }
+ // [END SDF_Truncate]
+ }
+
+ private static class GetSizeExample extends DoFn<String, Integer> {
+ // [START SDF_GetSize]
+ @GetSize
+ double getSize(@Element String fileName, @Restriction OffsetRange restriction) {
+ return (fileName.contains("expensiveRecords") ? 2 : 1) * restriction.getTo()
+ - restriction.getFrom();
+ }
+ // [END SDF_GetSize]
+ }
+ }
}
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 8f557f6..c4a569d 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1639,3 +1639,210 @@ def nlp_analyze_text():
| 'Parse adjacency list to JSON' >> beam.Map(json.dumps)
| 'Write adjacency list' >> beam.io.WriteToText('adjancency_list.txt'))
# [END nlp_analyze_text]
+
+
+def sdf_basic_example():
+ import os
+ from apache_beam.io.restriction_trackers import OffsetRange
+ read_next_record = None
+
+ # [START SDF_BasicExample]
+ class FileToWordsRestrictionProvider(beam.io.RestrictionProvider):
+ def initial_restriction(self, file_name):
+ return OffsetRange(0, os.stat(file_name).st_size)
+
+ def create_tracker(self, restriction):
+ return beam.io.restriction_trackers.OffsetRestrictionTracker()
+
+ class FileToWordsFn(beam.DoFn):
+ def process(
+ self,
+ file_name,
+ tracker=beam.DoFn.RestrictionParam(FileToWordsRestrictionProvider())):
+ with open(file_name) as file_handle:
+ file_handle.seek(tracker.current_restriction.start())
+ while tracker.try_claim(file_handle.tell()):
+ yield read_next_record(file_handle)
+
+ # Providing the coder is only necessary if it can not be inferred at
+ # runtime.
+ def restriction_coder(self):
+ return ...
+
+ # [END SDF_BasicExample]
+
+
+def sdf_basic_example_with_splitting():
+ from apache_beam.io.restriction_trackers import OffsetRange
+
+ # [START SDF_BasicExampleWithSplitting]
+ class FileToWordsRestrictionProvider(beam.io.RestrictionProvider):
+ def split(self, file_name, restriction):
+ # Compute and output 64 MiB size ranges to process in parallel
+ split_size = 64 * (1 << 20)
+ i = restriction.start
+ while i < restriction.end - split_size:
+ yield OffsetRange(i, i + split_size)
+ i += split_size
+ yield OffsetRange(i, restriction.end)
+
+ # [END SDF_BasicExampleWithSplitting]
+
+
+def sdf_sdk_initiated_checkpointing():
+ timestamp = None
+ external_service = None
+
+ class MyRestrictionProvider(object):
+ pass
+
+ # [START SDF_UserInitiatedCheckpoint]
+ class MySplittableDoFn(beam.DoFn):
+ def process(
+ self,
+ element,
+ restriction_tracker=beam.DoFn.RestrictionParam(
+ MyRestrictionProvider())):
+ current_position = restriction_tracker.current_restriction.start()
+ while True:
+ # Pull records from an external service.
+ try:
+ records = external_service.fetch(current_position)
+ if records.empty():
+ # Set a shorter delay in case we are being throttled.
+ restriction_tracker.defer_remainder(timestamp.Duration(second=10))
+ return
+ for record in records:
+ if restriction_tracker.try_claim(record.position):
+ current_position = record.position
+ yield record
+ else:
+ return
+ except TimeoutError:
+ # Set a longer delay in case we are being throttled.
+ restriction_tracker.defer_remainder(timestamp.Duration(seconds=60))
+ return
+
+ # [END SDF_UserInitiatedCheckpoint]
+
+
+def sdf_get_size():
+ # [START SDF_GetSize]
+ # The RestrictionProvider is responsible for calculating the size of given
+ # restriction.
+ class MyRestrictionProvider(beam.transforms.core.RestrictionProvider):
+ def restriction_size(self, file_name, restriction):
+ weight = 2 if "expensiveRecords" in file_name else 1
+ return restriction.size() * weight
+
+ # [END SDF_GetSize]
+
+
+def sdf_bad_try_claim_loop():
+ class FileToWordsRestrictionProvider(object):
+ pass
+
+ read_next_record = None
+
+ # [START SDF_BadTryClaimLoop]
+ class BadTryClaimLoop(beam.DoFn):
+ def process(
+ self,
+ file_name,
+ tracker=beam.DoFn.RestrictionParam(FileToWordsRestrictionProvider())):
+ with open(file_name) as file_handle:
+ file_handle.seek(tracker.current_restriction.start())
+ # The restriction tracker can be modified by another thread in parallel
+ # so storing state locally is ill advised.
+ end = tracker.current_restriction.end()
+ while file_handle.tell() < end:
+ # Only after successfully claiming should we produce any output and/or
+ # perform side effects.
+ tracker.try_claim(file_handle.tell())
+ yield read_next_record(file_handle)
+
+ # [END SDF_BadTryClaimLoop]
+
+
+def sdf_custom_watermark_estimator():
+ from apache_beam.io.iobase import WatermarkEstimator
+ from apache_beam.transforms.core import WatermarkEstimatorProvider
+ current_watermark = None
+
+ class MyRestrictionProvider(object):
+ pass
+
+ # [START SDF_CustomWatermarkEstimator]
+ # (Optional) Define a custom watermark state type to save information between
+ # bundle processing rounds.
+ class MyCustomerWatermarkEstimatorState(object):
+ def __init__(self, element, restriction):
+ # Store data necessary for future watermark computations
+ pass
+
+ # Define a WatermarkEstimator
+ class MyCustomWatermarkEstimator(WatermarkEstimator):
+ def __init__(self, estimator_state):
+ self.state = estimator_state
+
+ def observe_timestamp(self, timestamp):
+ # Will be invoked on each output from the SDF
+ pass
+
+ def current_watermark(self):
+ # Return a monotonically increasing value
+ return current_watermark
+
+ def get_estimator_state(self):
+ # Return state to resume future watermark estimation after a
+ # checkpoint/split
+ return self.state
+
+ # Then, a WatermarkEstimatorProvider needs to be created for this
+ # WatermarkEstimator
+ class MyWatermarkEstimatorProvider(WatermarkEstimatorProvider):
+ def initial_estimator_state(self, element, restriction):
+ return MyCustomerWatermarkEstimatorState(element, restriction)
+
+ def create_watermark_estimator(self, estimator_state):
+ return MyCustomWatermarkEstimator(estimator_state)
+
+ # Finally, define the SDF using your estimator.
+ class MySplittableDoFn(beam.DoFn):
+ def process(
+ self,
+ element,
+ restriction_tracker=beam.DoFn.RestrictionParam(MyRestrictionProvider()),
+ watermark_estimator=beam.DoFn.WatermarkEstimatorParam(
+ MyWatermarkEstimatorProvider())):
+ # The current watermark can be inspected.
+ watermark_estimator.current_watermark()
+
+ # [END SDF_CustomWatermarkEstimator]
+
+
+def sdf_truncate():
+ # [START SDF_Truncate]
+ class MyRestrictionProvider(beam.transforms.core.RestrictionProvider):
+ def truncate(self, file_name, restriction):
+ if "optional" in file_name:
+ # Skip optional files
+ return None
+ return restriction
+
+ # [END SDF_Truncate]
+
+
+def bundle_finalize():
+ my_callback_func = None
+
+ # [START BundleFinalize]
+ class MySplittableDoFn(beam.DoFn):
+ def process(self, element, bundle_finalizer=beam.DoFn.BundleFinalizerParam):
+ # ... produce output ...
+
+ # Register callback function for this bundle that performs the side
+ # effect.
+ bundle_finalizer.register(my_callback_func)
+
+ # [END BundleFinalize]
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index 8371a48..548dd87 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -5143,3 +5143,281 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
}
}));
{{< /highlight >}}
+
+## 12. Splittable `DoFns` {#splittable-dofns}
+
+A Splittable `DoFn` (SDF) enables users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally, users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular `DoFn` that read the file (decreased performance). With SDF,
+we bring the richness of Apache Beam’s I/O APIs to a `DoFn` enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1. SDF basics {#sdf-basics}
+
+At a high level, an SDF is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing an SDF follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read). Within this last step,
+the element and restriction pair can pause its own processing and/or be split into further element and
+restriction pairs.
+
+![Diagram of steps that an SDF is composed of](/images/sdf_high_level_overview.svg)
+
+
+#### 12.1.1. A basic SDF {#a-basic-sdf}
+
+A basic SDF is composed of three parts: a restriction, a restriction provider, and a
+restriction tracker. The restriction is used to represent a subset of work for a given element.
+The restriction provider lets SDF authors override default implementations for splitting, sizing,
+watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
+this is the `DoFn`. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
+has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking
+what subset of the restriction has been completed during processing.
+
+To define an SDF, you must choose whether the SDF is bounded (default) or
+unbounded and define a way to initialize an initial restriction for an element.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction {
+ return offsetrange.Restriction{
+ Start: 0,
+ End: getFileLength(filename),
+ }
+}
+
+func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+ return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+ file, err := os.Open(filename)
+ if err != nil {
+ return err
+ }
+ offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+ if err != nil {
+ return err
+ }
+ for rt.TryClaim(offset) {
+ record, newOffset := readNextRecord(file)
+ emit(record)
+ offset = newOffset
+ }
+ return nil
+}
+{{< /highlight >}}
+
+At this point, we have an SDF that supports [runner-initiated splits](#runner-initiated-split)
+enabling dynamic work rebalancing. To increase the rate at which initial parallelization of work occurs
+or for those runners that do not support runner-initiated splitting, we recommend providing
+a set of initial splits:
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) SplitRestriction(filename string, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
+ size := 64 * (1 << 20)
+ i := rest.Start
+ for i < rest.End - size {
+ // Compute and output 64 MiB size ranges to process in parallel
+ end := i + size
+ splits = append(splits, offsetrange.Restriction{i, end})
+ i = end
+ }
+ // Output the last range
+ splits = append(splits, offsetrange.Restriction{i, rest.End})
+ return splits
+}
+{{< /highlight >}}
+
+### 12.2. Sizing and progress {#sizing-and-progress}
+
+Sizing and progress are used during execution of an SDF to inform runners so that they may
+perform intelligent decisions about which restrictions to split and how to parallelize work.
+
+Before processing an element and restriction, an initial size may be used by a runner to choose
+how and who processes the restrictions attempting to improve initial balancing and parallelization
+of work. During the processing of an element and restriction, sizing and progress are used to choose
+which restrictions to split and who should process them.
+
+By default, we use the restriction tracker’s estimate for work remaining falling back to assuming
+that all restrictions have an equal cost. To override the default, SDF authors can provide the
+appropriate method within the restriction provider.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) RestrictionSize(filename string, rest offsetrange.Restriction) float64 {
+ weight := float64(1)
+ if strings.Contains(filename, “expensiveRecords”) {
+ weight = 2
+ }
+ return weight * (rest.End - rest.Start)
+}
+{{< /highlight >}}
+
+### 12.3. User-initiated checkpoint {#user-initiated-checkpoint}
+
+Some I/Os cannot produce all of the data necessary to complete a restriction within the lifetime of a
+single bundle. This typically happens with unbounded restrictions, but can also happen with bounded
+restrictions. For example, there could be more data that needs to be ingested but is not available yet.
+Another cause of this scenario is the source system throttling your data.
+
+Your SDF can signal to you that you are not done processing the current restriction. This
+signal can suggest a time to resume at. While the runner tries to honor the resume time, this is not
+guaranteed. This allows execution to continue on a restriction that has available work improving
+resource utilization.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_UserInitiatedCheckpoint >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_UserInitiatedCheckpoint >}}
+{{< /highlight >}}
+
+### 12.4. Runner-initiated split {#runner-initiated-split}
+
+A runner at any time may attempt to split a restriction while it is being processed. This allows the
+runner to either pause processing of the restriction so that other work may be done (common for
+unbounded restrictions to limit the amount of output and/or improve latency) or split the restriction
+into two pieces, increasing the available parallelism within the system. It is important to author a
+SDF with this in mind since the end of the restriction may change. Thus when writing the
+processing loop, it is important to use the result from trying to claim a piece of the restriction
+instead of assuming one can process till the end.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BadTryClaimLoop >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BadTryClaimLoop >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *badTryClaimLoop) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+ file, err := os.Open(filename)
+ if err != nil {
+ return err
+ }
+ offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+ if err != nil {
+ return err
+ }
+
+ // The restriction tracker can be modified by another thread in parallel
+ // so storing state locally is ill advised.
+ end = rt.GetRestriction().(offsetrange.Restriction).End
+ for offset < end {
+ // Only after successfully claiming should we produce any output and/or
+ // perform side effects.
+ rt.TryClaim(offset)
+ record, newOffset := readNextRecord(file)
+ emit(record)
+ offset = newOffset
+ }
+ return nil
+}
+{{< /highlight >}}
+
+### 12.5. Watermark estimation {#watermark-estimation}
+
+The default watermark estimator does not produce a watermark estimate. Therefore, the output watermark
+is solely computed by the minimum of upstream watermarks.
+
+An SDF can advance the output watermark by specifying a lower bound for all future output
+that this element and restriction pair will produce. The runner computes the minimum output watermark
+by taking the minimum over all upstream watermarks and the minimum reported by each element and
+restriction pair. The reported watermark must monotonically increase for each element and restriction
+pair across bundle boundaries. When an element and restriction pair stops processing its watermark,
+it is no longer considered part of the above calculation.
+
+Tips:
+* If you author an SDF that outputs records with timestamps, you should expose ways to allow users of
+this SDF to configure which watermark estimator to use.
+* Any data produced before the watermark may be considered late. See
+[watermarks and late data](#watermarks-and-late-data) for more details.
+
+#### 12.5.1. Controlling the watermark {#controlling-the-watermark}
+
+There are two general types of watermark estimators: timestamp observing and external clock observing.
+Timestamp observing watermark estimators use the output timestamp of each record to compute the watermark
+estimate while external clock observing watermark estimators control the watermark by using a clock that
+is not associated to any individual output, such as the local clock of the machine or a clock exposed
+through an external service.
+
+The restriction provider lets you override the default watermark estimation logic and use an existing
+watermark estimator implementation. You can also provide your own watermark estimator implementation.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_CustomWatermarkEstimator >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_CustomWatermarkEstimator >}}
+{{< /highlight >}}
+
+### 12.6. Truncating during drain {#truncating-during-drain}
+
+Runners which support draining pipelines need the ability to drain SDFs; otherwise, the
+pipeline may never stop. By default, bounded restrictions process the remainder of the restriction while
+unbounded restrictions finish processing at the next SDF-initiated checkpoint or runner-initiated split.
+You are able to override this default behavior by defining the appropriate method on the restriction
+provider.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_Truncate >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_Truncate >}}
+{{< /highlight >}}
+
+### 12.7. Bundle finalization {#bundle-finalization}
+
+Bundle finalization enables a `DoFn` to perform side effects by registering a callback.
+The callback is invoked once the runner has acknowledged that it has durably persisted the output.
+For example, a message queue might need to acknowledge messages that it has ingested into the pipeline.
+Bundle finalization is not limited to SDFs but is called out here since this is the primary
+use case.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" BundleFinalize >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
+{{< /highlight >}}
\ No newline at end of file
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index 05264bc..a7ba2d4 100644
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -158,6 +158,20 @@
<li><a href="/documentation/programming-guide/#state-timers-examples">State and timers examples</a></li>
</ul>
</li>
+
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Splittable DoFns</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/#sdf-basics">Basics</a></li>
+ <li><a href="/documentation/programming-guide/#sizing-and-progress">Sizing and progress</a></li>
+ <li><a href="/documentation/programming-guide/#user-initiated-checkpoint">User-initiated checkpoint</a></li>
+ <li><a href="/documentation/programming-guide/#runner-initiated-split">Runner initiated split</a></li>
+ <li><a href="/documentation/programming-guide/#watermark-estimation">Watermark estimation</a></li>
+ <li><a href="/documentation/programming-guide/#truncating-during-drain">Truncating during drain</a></li>
+ <li><a href="/documentation/programming-guide/#bundle-finalization">Bundle finalization</a></li>
+ </ul>
+ </li>
</ul>
</li>
diff --git a/website/www/site/static/images/sdf_high_level_overview.svg b/website/www/site/static/images/sdf_high_level_overview.svg
new file mode 100644
index 0000000..9ad596f
--- /dev/null
+++ b/website/www/site/static/images/sdf_high_level_overview.svg
@@ -0,0 +1,415 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<svg
+ xmlns:dc="http://purl.org/dc/elements/1.1/"
+ xmlns:cc="http://creativecommons.org/ns#"
+ xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
+ xmlns:svg="http://www.w3.org/2000/svg"
+ xmlns="http://www.w3.org/2000/svg"
+ xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
+ xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
+ inkscape:version="1.0 (4035a4fb49, 2020-05-01)"
+ height="241.89746"
+ width="776.61823"
+ sodipodi:docname="SplittableDoFn 2020 Blog.svg"
+ id="svg121"
+ stroke-miterlimit="10"
+ stroke-linecap="square"
+ stroke="none"
+ fill="none"
+ viewBox="0 0 776.61823 241.89746"
+ version="1.1">
+ <metadata
+ id="metadata127">
+ <rdf:RDF>
+ <cc:Work
+ rdf:about="">
+ <dc:format>image/svg+xml</dc:format>
+ <dc:type
+ rdf:resource="http://purl.org/dc/dcmitype/StillImage" />
+ </cc:Work>
+ </rdf:RDF>
+ </metadata>
+ <defs
+ id="defs125" />
+ <sodipodi:namedview
+ inkscape:current-layer="svg121"
+ inkscape:window-maximized="1"
+ inkscape:window-y="0"
+ inkscape:window-x="0"
+ inkscape:cy="480"
+ inkscape:cx="513.17521"
+ inkscape:zoom="1.0210937"
+ showgrid="false"
+ id="namedview123"
+ inkscape:window-height="1246"
+ inkscape:window-width="2560"
+ inkscape:pageshadow="2"
+ inkscape:pageopacity="0"
+ guidetolerance="10"
+ gridtolerance="10"
+ objecttolerance="10"
+ borderopacity="1"
+ bordercolor="#666666"
+ pagecolor="#ffffff" />
+ <clipPath
+ id="p.0">
+ <path
+ id="path2"
+ clip-rule="nonzero"
+ d="M 0,0 H 1280 V 960 H 0 Z" />
+ </clipPath>
+ <g
+ id="g119"
+ clip-path="url(#p.0)">
+ <path
+ id="path5"
+ fill-rule="evenodd"
+ d="M 0,0 H 1280 V 960 H 0 Z"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path7"
+ fill-rule="evenodd"
+ d="m 46.39895,43.145138 v 0 C 46.39895,21.566464 63.891923,4.07349 85.470598,4.07349 H 696.9336 c 10.36242,0 20.30047,4.116465 27.62781,11.443821 7.32739,7.327356 11.44384,17.26538 11.44384,27.627827 V 199.42704 c 0,21.57867 -17.49298,39.07164 -39.07165,39.07164 H 85.470599 c -21.578674,0 -39.071648,-17.49297 -39.071648,-39.07164 z"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path9"
+ fill-rule="evenodd"
+ d="m 46.39895,43.145138 v 0 C 46.39895,21.566464 63.891923,4.07349 85.470598,4.07349 H 696.9336 c 10.36242,0 20.30047,4.116465 27.62781,11.443821 7.32739,7.327356 11.44384,17.26538 11.44384,27.627827 V 199.42704 c 0,21.57867 -17.49298,39.07164 -39.07165,39.07164 H 85.470599 c -21.578674,0 -39.071648,-17.49297 -39.071648,-39.07164 z"
+ stroke-linecap="butt"
+ stroke-linejoin="round"
+ stroke-width="2"
+ stroke="#b7b7b7" />
+ <path
+ id="path11"
+ fill-rule="evenodd"
+ d="m 72,87.2021 h 112 v 78.99213 H 72 Z"
+ fill="#ff9900" />
+ <path
+ id="path13"
+ fill-rule="nonzero"
+ d="m 90.46094,109.10504 v 4.45313 h -1.5 v -11.375 h 4.1875 q 1.875,0 2.921875,0.95312 1.0625,0.95313 1.0625,2.51563 0,1.65625 -1.03125,2.5625 -1.03125,0.89062 -2.96875,0.89062 z m 0,-1.23437 h 2.6875 q 1.203125,0 1.84375,-0.5625 0.640625,-0.5625 0.640625,-1.64063 0,-1.01562 -0.640625,-1.625 -0.640625,-0.60937 -1.75,-0.625 h -2.78125 z m 13.49219,5.6875 q -0.125,-0.25 -0.20313,-0.89063 -1,1.04688 -2.40625,1.04688 -1.25,0 -2.046873,-0.70313 -0.796875,-0.71875 -0.796875,-1.79687 0,- [...]
+ fill="#ffffff" />
+ <path
+ id="path15"
+ fill-rule="nonzero"
+ d="m 100.48047,130.55817 1.625,-6.45313 h 1.45312 l -2.46875,8.45313 h -1.17187 l -2.046875,-6.40625 -2,6.40625 H 94.69922 l -2.453125,-8.45313 h 1.4375 l 1.65625,6.32813 1.96875,-6.32813 h 1.171875 z m 6.11719,2 h -1.45313 v -8.45313 h 1.45313 z m -1.5625,-10.70313 q 0,-0.34375 0.20312,-0.57812 0.21875,-0.25 0.64063,-0.25 0.42187,0 0.64062,0.25 0.21875,0.23437 0.21875,0.59375 0,0.34375 -0.21875,0.57812 -0.21875,0.23438 -0.64062,0.23438 -0.42188,0 -0.64063,-0.23438 -0.20312,-0.234 [...]
+ fill="#ffffff" />
+ <path
+ id="path17"
+ fill-rule="nonzero"
+ d="m 90.30469,144.40192 q -0.328125,-0.0625 -0.71875,-0.0625 -1.421875,0 -1.921875,1.21875 v 6 H 86.21094 v -8.45313 h 1.40625 l 0.03125,0.96875 q 0.703125,-1.125 2.015625,-1.125 0.421875,0 0.640625,0.10938 z m 4.6875,7.3125 q -1.71875,0 -2.796875,-1.125 -1.078125,-1.14063 -1.078125,-3.03125 v -0.26563 q 0,-1.25 0.46875,-2.23437 0.484375,-1 1.34375,-1.54688 0.875,-0.5625 1.875,-0.5625 1.65625,0 2.5625,1.09375 0.921875,1.07813 0.921875,3.09375 v 0.60938 H 92.55469 q 0.03125,1.25 0. [...]
+ fill="#ffffff" />
+ <path
+ id="path19"
+ fill-rule="evenodd"
+ d="M 0,126.53281 H 72"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path21"
+ fill-rule="evenodd"
+ d="M 0,126.53281 H 65.14583"
+ stroke-linecap="butt"
+ stroke-linejoin="round"
+ stroke-width="2"
+ stroke="#4a86e8" />
+ <path
+ id="path23"
+ fill-rule="evenodd"
+ d="m 65.14583,126.53281 -2.249168,2.24918 6.179535,-2.24918 -6.179535,-2.24917 z"
+ stroke-linecap="butt"
+ stroke-width="2"
+ stroke="#4a86e8"
+ fill="#4a86e8" />
+ <path
+ id="path25"
+ fill-rule="evenodd"
+ d="m 16,94.2021 h 40 v 63.33858 H 16 Z"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path27"
+ fill-rule="nonzero"
+ d="M 33.71875,118.5621 H 32.203125 L 31.5,116.32773 h -4.25 l -0.71875,2.23437 H 25.078125 L 28.46875,108.34335 H 30.375 Z m -2.625,-3.46875 -1.71875,-5.46875 -1.734375,5.46875 z"
+ fill="#000000" />
+ <path
+ id="path29"
+ fill-rule="evenodd"
+ d="m 184,126.69817 h 96"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path31"
+ fill-rule="evenodd"
+ d="m 184,126.69817 h 89.14584"
+ stroke-linecap="butt"
+ stroke-linejoin="round"
+ stroke-width="2"
+ stroke="#4a86e8" />
+ <path
+ id="path33"
+ fill-rule="evenodd"
+ d="m 273.1458,126.69817 -2.24915,2.24916 6.17954,-2.24916 -6.17954,-2.24917 z"
+ stroke-linecap="butt"
+ stroke-width="2"
+ stroke="#4a86e8"
+ fill="#4a86e8" />
+ <path
+ id="path35"
+ fill-rule="evenodd"
+ d="m 184,94.2021 h 104 v 31.33858 H 184 Z"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path37"
+ fill-rule="nonzero"
+ d="m 201.1875,118.5621 h -1.75 l -3.82813,-5.01562 v 5.01562 h -1.39062 v -10.21875 h 1.39062 v 4.75 l 3.75,-4.75 H 201 l -4.03125,4.85938 z m 9.375,-10.21875 -3.46875,10.21875 h -1.875 l -3.39063,-10.21875 h 1.57813 l 2.20312,6.89063 0.625,2.01562 0.64063,-2.01562 2.20312,-6.89063 z m 7.20312,9.48438 -0.84375,0.85937 -5.25,-4.39062 5.25,-4.375 0.84375,0.85937 -4.25,3.5 z m 10.34375,0.73437 h -1.51562 l -0.70313,-2.23437 h -4.25 l -0.71875,2.23437 h -1.45312 l 3.39062,-10.21875 h [...]
+ fill="#000000" />
+ <path
+ id="path39"
+ fill-rule="evenodd"
+ d="m 280,87.2021 h 176 v 78.99213 H 280 Z"
+ fill="#ff9900" />
+ <path
+ id="path41"
+ fill-rule="nonzero"
+ d="m 330.8711,108.48004 q -1.92187,-0.54687 -2.8125,-1.35937 -0.875,-0.8125 -0.875,-2 0,-1.34375 1.07813,-2.21875 1.07812,-0.875 2.79687,-0.875 1.17188,0 2.07813,0.45312 0.92187,0.45313 1.42187,1.25 0.51563,0.79688 0.51563,1.73438 h -1.51563 q 0,-1.03125 -0.65625,-1.60938 -0.65625,-0.59375 -1.84375,-0.59375 -1.10937,0 -1.73437,0.48438 -0.625,0.48437 -0.625,1.35937 0,0.6875 0.59375,1.17188 0.59375,0.48437 2,0.89062 1.42187,0.39063 2.21875,0.875 0.8125,0.46875 1.1875,1.10938 0.39062 [...]
+ fill="#ffffff" />
+ <path
+ id="path43"
+ fill-rule="nonzero"
+ d="m 326.09766,122.05817 v 2.04688 h 1.57813 v 1.10937 h -1.57813 v 5.25 q 0,0.5 0.20313,0.76563 0.21875,0.25 0.71875,0.25 0.25,0 0.6875,-0.0937 v 1.17187 q -0.5625,0.15625 -1.10938,0.15625 -0.96875,0 -1.46875,-0.57812 -0.48437,-0.59375 -0.48437,-1.67188 v -5.25 h -1.53125 v -1.10937 h 1.53125 v -2.04688 z m 2.72656,6.1875 q 0,-1.23437 0.48438,-2.21875 0.48437,-1 1.35937,-1.53125 0.875,-0.54687 1.98438,-0.54687 1.73437,0 2.79687,1.20312 1.0625,1.1875 1.0625,3.17188 v 0.0937 q 0,1. [...]
+ fill="#ffffff" />
+ <path
+ id="path45"
+ fill-rule="nonzero"
+ d="m 320.51562,147.41754 q 0,1.9375 -0.89062,3.125 -0.875,1.17187 -2.375,1.17187 -1.54688,0 -2.42188,-0.98437 v 4.07812 H 313.375 v -11.70312 h 1.32812 l 0.0625,0.9375 q 0.89063,-1.09375 2.45313,-1.09375 1.53125,0 2.40625,1.15625 0.89062,1.14062 0.89062,3.1875 z m -1.4375,-0.15625 q 0,-1.4375 -0.60937,-2.26563 -0.60938,-0.82812 -1.67188,-0.82812 -1.3125,0 -1.96875,1.17187 v 4.03125 q 0.64063,1.15625 1.98438,1.15625 1.03125,0 1.64062,-0.8125 0.625,-0.82812 0.625,-2.45312 z m 7.3671 [...]
+ fill="#ffffff" />
+ <path
+ id="path47"
+ fill-rule="evenodd"
+ d="M 456,126.53281 H 568"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path49"
+ fill-rule="evenodd"
+ d="M 456,126.53281 H 561.14581"
+ stroke-linecap="butt"
+ stroke-linejoin="round"
+ stroke-width="2"
+ stroke="#4a86e8" />
+ <path
+ id="path51"
+ fill-rule="evenodd"
+ d="m 561.1458,126.53281 -2.24915,2.24918 6.17951,-2.24918 -6.17951,-2.24917 z"
+ stroke-linecap="butt"
+ stroke-width="2"
+ stroke="#4a86e8"
+ fill="#4a86e8" />
+ <path
+ id="path53"
+ fill-rule="evenodd"
+ d="M 460,55.217846 H 564 V 125.86352 H 460 Z"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path55"
+ fill-rule="nonzero"
+ d="m 477.1875,79.57785 h -1.75 l -3.82813,-5.015625 v 5.015625 h -1.39062 V 69.3591 h 1.39062 v 4.75 l 3.75,-4.75 H 477 l -4.03125,4.859375 z m 9.375,-10.21875 -3.46875,10.21875 h -1.875 L 477.82812,69.3591 h 1.57813 l 2.20312,6.890625 0.625,2.015625 0.64063,-2.015625 2.20312,-6.890625 z m 7.20312,9.484375 -0.84375,0.859375 -5.25,-4.390625 5.25,-4.375 0.84375,0.859375 -4.25,3.5 z m 10.34375,0.734375 h -1.51562 l -0.70313,-2.234375 h -4.25 l -0.71875,2.234375 h -1.45312 l 3.39062,- [...]
+ fill="#000000" />
+ <path
+ id="path57"
+ fill-rule="nonzero"
+ d="m 535.7656,85.57785 h -4.26563 V 84.7341 h 1.75 v -4.96875 l -1.64062,0.890625 -0.32813,-0.765625 2.15625,-1.140625 h 0.8125 V 84.7341 h 1.51563 z"
+ fill="#000000" />
+ <path
+ id="path59"
+ fill-rule="nonzero"
+ d="m 538.07764,71.7966 0.84375,-0.859375 5.25,4.375 -5.25,4.390625 -0.84375,-0.859375 4.25,-3.515625 z"
+ fill="#000000" />
+ <path
+ id="path61"
+ fill-rule="nonzero"
+ d="m 473.34375,96.1091 q 0.26562,0 0.5,0.109375 0.23437,0.09375 0.40625,0.28125 0.1875,0.171875 0.28125,0.421875 0.10937,0.234375 0.10937,0.5 0,0.265625 -0.10937,0.5 -0.0937,0.234375 -0.28125,0.421875 -0.17188,0.171875 -0.40625,0.265625 -0.23438,0.109375 -0.5,0.109375 -0.28125,0 -0.51563,-0.109375 -0.23437,-0.09375 -0.40625,-0.265625 -0.17187,-0.1875 -0.28125,-0.421875 -0.0937,-0.234375 -0.0937,-0.5 0,-0.265625 0.0937,-0.5 0.10938,-0.25 0.28125,-0.421875 0.17188,-0.1875 0.40625,-0 [...]
+ fill="#000000" />
+ <path
+ id="path63"
+ fill-rule="nonzero"
+ d="m 477.1875,117.57785 h -1.75 l -3.82813,-5.01563 v 5.01563 h -1.39062 V 107.3591 h 1.39062 v 4.75 l 3.75,-4.75 H 477 l -4.03125,4.85937 z m 9.375,-10.21875 -3.46875,10.21875 h -1.875 l -3.39063,-10.21875 h 1.57813 l 2.20312,6.89062 0.625,2.01563 0.64063,-2.01563 2.20312,-6.89062 z m 7.20312,9.48437 -0.84375,0.85938 -5.25,-4.39063 5.25,-4.375 0.84375,0.85938 -4.25,3.5 z m 10.34375,0.73438 h -1.51562 l -0.70313,-2.23438 h -4.25 l -0.71875,2.23438 h -1.45312 l 3.39062,-10.21875 h [...]
+ fill="#000000" />
+ <path
+ id="path65"
+ fill-rule="nonzero"
+ d="m 535.8281,123.57785 h -1.21875 l -1.96875,-4.23438 -0.57813,-1.375 v 3.4375 2.17188 h -0.85937 v -6.79688 h 1.1875 l 1.89062,4.01563 0.67188,1.54687 v -3.64062 -1.92188 h 0.875 z"
+ fill="#000000" />
+ <path
+ id="path67"
+ fill-rule="nonzero"
+ d="m 538.07764,109.7966 0.84375,-0.85938 5.25,4.375 -5.25,4.39063 -0.84375,-0.85938 4.25,-3.51562 z"
+ fill="#000000" />
+ <path
+ id="path69"
+ fill-rule="evenodd"
+ d="m 568,87.2021 h 152 v 78.99213 H 568 Z"
+ fill="#ff9900" />
+ <path
+ id="path71"
+ fill-rule="nonzero"
+ d="m 580.03906,118.60504 v 4.45313 h -1.5 v -11.375 h 4.1875 q 1.875,0 2.92187,0.95312 1.0625,0.95313 1.0625,2.51563 0,1.65625 -1.03125,2.5625 -1.03125,0.89062 -2.96875,0.89062 z m 0,-1.23437 h 2.6875 q 1.20312,0 1.84375,-0.5625 0.64062,-0.5625 0.64062,-1.64063 0,-1.01562 -0.64062,-1.625 -0.64063,-0.60937 -1.75,-0.625 h -2.78125 z m 12.45312,-1.46875 q -0.32812,-0.0625 -0.71875,-0.0625 -1.42187,0 -1.92187,1.21875 v 6 h -1.45313 v -8.45313 h 1.40625 l 0.0312,0.96875 q 0.70313,-1.12 [...]
+ fill="#ffffff" />
+ <path
+ id="path73"
+ fill-rule="nonzero"
+ d="m 626.78845,142.05817 q -0.125,-0.25 -0.20313,-0.89063 -1,1.04688 -2.40625,1.04688 -1.25,0 -2.04687,-0.70313 -0.79688,-0.71875 -0.79688,-1.79687 0,-1.32813 1,-2.04688 1,-0.73437 2.82813,-0.73437 h 1.40625 v -0.67188 q 0,-0.75 -0.45313,-1.20312 -0.45312,-0.45313 -1.34375,-0.45313 -0.76562,0 -1.29687,0.39063 -0.51563,0.39062 -0.51563,0.95312 h -1.45312 q 0,-0.64062 0.4375,-1.21875 0.45312,-0.59375 1.21875,-0.9375 0.78125,-0.34375 1.70312,-0.34375 1.45313,0 2.28125,0.73438 0.82813 [...]
+ fill="#ffffff" />
+ <path
+ id="path75"
+ fill-rule="nonzero"
+ d="m 664.1869,143.38629 1.82813,-2.90625 h 1.17187 l -2.39062,3.76562 2.45312,3.8125 h -1.1875 l -1.875,-2.95312 -1.85937,2.95312 h -1.1875 l 2.45312,-3.8125 -2.39062,-3.76562 h 1.17187 z"
+ fill="#ffffff" />
+ <path
+ id="path77"
+ fill-rule="evenodd"
+ d="m 736,94.2021 h 32 v 31.33858 h -32 z"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path79"
+ fill-rule="nonzero"
+ d="m 752.9375,115.4996 q 0,0.73438 -0.28125,1.3125 -0.28125,0.5625 -0.8125,0.95313 -0.53125,0.39062 -1.28125,0.59375 -0.73438,0.20312 -1.65625,0.20312 h -2.67188 v -10.21875 h 2.92188 q 3.42187,0 3.42187,2.48438 0,0.82812 -0.40625,1.42187 -0.39062,0.59375 -1.28125,0.89063 0.42188,0.0781 0.78125,0.26562 0.375,0.1875 0.65625,0.48438 0.28125,0.29687 0.4375,0.70312 0.17188,0.40625 0.17188,0.90625 z m -1.8125,-4.48437 q 0,-0.3125 -0.0937,-0.57813 -0.0937,-0.28125 -0.32813,-0.48437 -0.2 [...]
+ fill="#000000" />
+ <path
+ id="path81"
+ fill-rule="evenodd"
+ d="m 720,126.69817 56,-0.31497"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path83"
+ fill-rule="evenodd"
+ d="m 720,126.69817 49.14593,-0.27642"
+ stroke-linecap="butt"
+ stroke-linejoin="round"
+ stroke-width="2"
+ stroke="#4a86e8" />
+ <path
+ id="path85"
+ fill-rule="evenodd"
+ d="m 769.14594,126.42175 -2.23651,2.26178 6.16681,-2.28389 -6.19208,-2.21437 z"
+ stroke-linecap="butt"
+ stroke-width="2"
+ stroke="#4a86e8"
+ fill="#4a86e8" />
+ <path
+ id="path87"
+ fill-rule="evenodd"
+ d="M 670.224,165.70341 V 190.2021 H 528 v -63"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path89"
+ fill-rule="evenodd"
+ d="M 670.224,165.70341 V 190.2021 H 528 v -56.14583"
+ stroke-linecap="butt"
+ stroke-linejoin="round"
+ stroke-width="2"
+ stroke="#4a86e8" />
+ <path
+ id="path91"
+ fill-rule="evenodd"
+ d="m 528,134.05627 2.24915,2.24916 L 528,130.1259 l -2.24915,6.17953 z"
+ stroke-linecap="butt"
+ stroke-width="2"
+ stroke="#4a86e8"
+ fill="#4a86e8" />
+ <path
+ id="path93"
+ fill-rule="evenodd"
+ d="m 548.3436,182.09636 v 0 c -0.99908,-2.58107 2.28101,-5.13618 8.4483,-6.5811 6.1673,-1.44492 14.14026,-1.52623 20.53565,-0.20944 v 0 c 2.26544,-1.50074 6.4118,-2.5369 11.185,-2.79506 4.77319,-0.25817 9.61248,0.29201 13.05413,1.48408 v 0 c 1.92987,-1.36067 5.71918,-2.27487 10.02332,-2.41819 4.30414,-0.14331 8.51392,0.50453 11.1355,1.71365 v 0 c 3.48657,-1.44231 9.03381,-2.04959 14.24139,-1.55905 5.20752,0.49054 9.14014,1.99077 10.09607,3.85154 v 0 c 4.27161,0.40963 7.82983,1.450 [...]
+ fill="#ff9900" />
+ <path
+ id="path95"
+ fill-rule="evenodd"
+ d="m 543.22095,190.37572 v 0 c 2.19464,0.4541 4.72998,0.6601 7.26556,0.59032 m 3.32111,6.75872 c 1.09039,-0.0449 2.15918,-0.13984 3.17883,-0.28253 m 27.43964,3.09269 c -0.76691,-0.40433 -1.409,-0.83638 -1.91535,-1.28881 m 36.55225,-0.52661 v 0 c 0.39563,-0.46057 0.65204,-0.93458 0.76477,-1.41415 m 24.6134,-3.48175 c 0.0513,-2.24566 -3.57678,-4.30179 -9.32586,-5.28525 m 21.98626,-5.63465 c -0.93103,0.7647 -2.35235,1.44303 -4.15258,1.98185 m -5.88623,-9.30275 v 0 c 0.15863,0.3088 0. [...]
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path97"
+ fill-rule="nonzero"
+ d="m 572.07404,181.51132 q -0.23438,1.32812 -1.1875,2.10937 -0.95313,0.76563 -2.28125,0.73438 -0.79688,-0.0312 -1.39063,-0.39063 -0.59375,-0.375 -0.92187,-1.07812 -0.32813,-0.70313 -0.35938,-1.54688 -0.0156,-0.48437 0.0469,-0.9375 l 0.0937,-0.70312 q 0.28125,-1.92188 1.34375,-3.03125 1.07812,-1.10938 2.625,-1.09375 1.20312,0.0312 1.90625,0.78125 0.70312,0.75 0.73437,2.09375 h -1.09375 q -0.0781,-1.79688 -1.40625,-1.9375 l -0.1875,-0.0156 q -1.10937,-0.0312 -1.85937,0.82813 -0.7343 [...]
+ fill="#ffffff" />
+ <path
+ id="path99"
+ fill-rule="nonzero"
+ d="m 579.96466,198.96445 h -0.95313 l 5.0625,-9.26563 h 0.95313 z m 11.61133,-2.45313 q 0.0937,-0.65625 -0.71875,-0.92187 l -1.10938,-0.32813 q -1.39062,-0.48437 -1.34375,-1.59375 0.0469,-0.82812 0.76563,-1.35937 0.71875,-0.54688 1.70312,-0.53125 0.95313,0 1.54688,0.5625 0.60937,0.54687 0.59375,1.42187 l -1.0625,-0.0156 q 0.0156,-0.46875 -0.29688,-0.76563 -0.3125,-0.29687 -0.82812,-0.3125 -0.54688,-0.0156 -0.9375,0.26563 -0.375,0.26562 -0.4375,0.70312 -0.0781,0.54688 0.67187,0.796 [...]
+ fill="#ffffff" />
+ <path
+ id="path101"
+ fill-rule="evenodd"
+ d="m 547.1102,195.32545 h 104 v 31.33859 h -104 z"
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path103"
+ fill-rule="nonzero"
+ d="m 564.2977,219.68546 h -1.75 l -3.82813,-5.01563 v 5.01563 h -1.39062 v -10.21875 h 1.39062 v 4.75 l 3.75,-4.75 h 1.64063 l -4.03125,4.85937 z m 9.375,-10.21875 -3.46875,10.21875 h -1.875 l -3.39063,-10.21875 h 1.57813 l 2.20312,6.89062 0.625,2.01563 0.64063,-2.01563 2.20312,-6.89062 z m 7.20312,9.48437 -0.84375,0.85938 -5.25,-4.39063 5.25,-4.375 0.84375,0.85938 -4.25,3.5 z m 10.34375,0.73438 h -1.51562 l -0.70313,-2.23438 h -4.25 l -0.71875,2.23438 h -1.45312 l 3.39062,-10.218 [...]
+ fill="#000000" />
+ <path
+ id="path105"
+ fill-rule="nonzero"
+ d="m 623.4227,225.68546 h -1.17188 l -1.65625,-2.75 -1.65625,2.75 h -1.15625 l 2.26563,-3.46875 -2.07813,-3.32813 h 1.09375 l 1.54688,2.5625 1.57812,-2.5625 h 1.0625 l -2.09375,3.28125 z m 3.65576,-7.35938 -0.15625,2.45313 h -0.89062 l -0.14063,-2.45313 z"
+ fill="#000000" />
+ <path
+ id="path107"
+ fill-rule="nonzero"
+ d="m 631.0467,211.9042 0.84375,-0.85938 5.25,4.375 -5.25,4.39063 -0.84375,-0.85938 4.25,-3.51562 z"
+ fill="#000000" />
+ <path
+ id="path109"
+ fill-rule="evenodd"
+ d="m 433.0334,25.323118 v 0 c -1.55396,-3.918263 3.548,-7.797104 13.14096,-9.99059 9.59299,-2.193486 21.99454,-2.316926 31.94223,-0.317941 v 0 c 3.5238,-2.27824 9.97333,-3.851207 17.3978,-4.24311 7.42444,-0.391903 14.95178,0.443288 20.30508,2.252947 v 0 c 3.00183,-2.065599 8.89594,-3.4534183 15.59076,-3.6709857 6.69489,-0.2175684 13.24304,0.7659057 17.3208,2.6014337 v 0 c 5.42322,-2.1895293 14.0517,-3.1114129 22.1518,-2.3667449 8.10015,0.7446669 14.2171,3.0221369 15.70397,5.846933 [...]
+ fill="#ff9900" />
+ <path
+ id="path111"
+ fill-rule="evenodd"
+ d="m 425.06528,37.891766 v 0 c 3.4137,0.689373 7.35727,1.002087 11.3013,0.89616 m 5.1658,10.260231 c 1.69608,-0.0681 3.35855,-0.212307 4.94455,-0.428902 m 42.68109,4.694912 c -1.1929,-0.613792 -2.19168,-1.269683 -2.97928,-1.956493 m 56.85532,-0.799435 v 0 c 0.61548,-0.699173 1.01422,-1.418773 1.18964,-2.146797 m 38.28503,-5.285549 c 0.0798,-3.40905 -5.56354,-6.530414 -14.50598,-8.023369 m 34.19867,-8.553799 c -1.44818,1.160845 -3.65906,2.190617 -6.45917,3.008572 m -9.15582,-14.122 [...]
+ fill-opacity="0"
+ fill="#000000" />
+ <path
+ id="path113"
+ fill-rule="nonzero"
+ d="m 456.44363,19.39394 h -1.96875 l -0.59375,3.453125 h -1.10938 l 1.48438,-8.531251 h 2.5625 q 1.34375,0.01563 2.04687,0.6875 0.70313,0.671875 0.59375,1.828126 -0.0625,0.8125 -0.59375,1.421875 -0.51562,0.609375 -1.375,0.90625 l 1.32813,3.609375 -0.0156,0.07813 h -1.17187 z m -1.8125,-0.921875 1.625,0.01563 q 0.84375,0 1.42187,-0.453125 0.57813,-0.453125 0.67188,-1.203125 0.0937,-0.734375 -0.29688,-1.140626 -0.375,-0.421875 -1.15625,-0.4375 l -1.70312,-0.01563 z m 9.22461,3.75 q [...]
+ fill="#ffffff" />
+ <path
+ id="path115"
+ fill-rule="nonzero"
+ d="m 469.0911,36.847065 q -0.0469,-0.171875 -0.0469,-0.328125 l 0.0156,-0.34375 q -0.85938,0.8125 -1.84375,0.796875 -0.84375,-0.01563 -1.375,-0.53125 -0.51563,-0.53125 -0.46875,-1.3125 0.0625,-1 0.875,-1.5625 0.82812,-0.5625 2.17187,-0.5625 h 1.10938 l 0.0781,-0.5 q 0.0469,-0.578127 -0.23437,-0.906252 -0.26563,-0.328125 -0.82813,-0.34375 -0.54687,-0.01563 -0.96875,0.265625 -0.40625,0.28125 -0.5,0.734377 l -1.07812,0.01563 q 0.0625,-0.562502 0.4375,-0.984377 0.375,-0.4375 0.96875,- [...]
+ fill="#ffffff" />
+ <path
+ id="path117"
+ fill-rule="nonzero"
+ d="m 490.21317,49.347065 2.01562,-4.84375 h 1.09375 l -2.89062,6.34375 h -0.85938 l -0.65625,-4.796875 -2.28125,4.796875 h -0.85937 l -0.6875,-6.34375 h 1.01562 l 0.39063,4.71875 2.25,-4.71875 h 0.84375 z m 6.63477,-4.953125 q 0.79687,0 1.35937,0.421875 0.57813,0.421875 0.84375,1.171875 0.28125,0.734375 0.1875,1.625 l -0.0156,0.125 q -0.10938,0.921875 -0.54688,1.671875 -0.4375,0.75 -1.14062,1.171875 -0.6875,0.40625 -1.53125,0.390625 -0.78125,-0.01563 -1.35938,-0.4375 -0.5625,-0.42 [...]
+ fill="#ffffff" />
+ </g>
+</svg>