You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by bhupeshchawda <gi...@git.apache.org> on 2016/06/07 20:07:56 UTC

[GitHub] apex-malhar pull request #309: [APEXMALHAR-2106][WIP] Support multiple strea...

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/309#discussion_r66142732
  
    --- Diff: library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
    @@ -0,0 +1,277 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.stream;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Module;
    +
    +/**
    + * Module that adds functionality to bypass the platform limitations of combining more than two streams at a time with
    + * Stream Merger.
    + *
    + * Usage:
    + *
    + * dag.addOperator("Stream_1", op1);
    + * dag.addOperator("Stream_2", op2);
    + * dag.addOperator("Stream_3", op3);
    + *
    + * MultipleStreamMerger merger = new MultipleStreamMerger();
    + * merger.merge(op1.out)
    + * .merge(op2.out)
    + * .merge(op3.out)
    + * .insertInto(dag, conf);
    + *
    + * dag.addModule("Merger", merger);
    + *
    + * @param <K>
    + */
    +public class MultipleStreamMerger<K> implements Module
    +{
    +  public class Stream
    +  {
    +    DefaultInputPort destPort;
    +    DefaultOutputPort sourcePort;
    +    String name;
    +
    +    public Stream(String name, DefaultOutputPort sourcePort, DefaultInputPort destPort)
    +    {
    +      this.destPort = destPort;
    +      this.sourcePort = sourcePort;
    +      this.name = name;
    +    }
    +  }
    +
    +  public class NamedMerger
    +  {
    +    StreamMerger<K> merger;
    +    String name;
    +
    +    public NamedMerger(String name, StreamMerger<K> merger)
    +    {
    +      this.merger = merger;
    +      this.name = name;
    +    }
    +  }
    +
    +  private int streamCount = 0;
    +
    +  ArrayList<DefaultOutputPort<K>> streamsToMerge = new ArrayList<>();
    +
    +  public transient ProxyOutputPort<K> streamOutput = new ProxyOutputPort<>();
    +
    +  /**
    +   * Used to define all the sources to be merged into a single stream.
    +   *
    +   * @param sourcePort - The output port from the upstream operator that provides data
    +   * @return The updated MultipleStreamMerger object that tracks which streams should be unified.
    +   */
    +  public MultipleStreamMerger<K> merge(DefaultOutputPort<K> sourcePort)
    +  {
    +    streamsToMerge.add(sourcePort);
    +    return this;
    +  }
    +
    +  /**
    +   * To merge more than two streams at a time, we construct a binary tree of thread-local StreamMerger operators
    +   * E.g.
    +   *
    +   * Tier 0          Tier 1              Tier 2
    +   *
    +   * Stream 1 ->
    +   * StreamMerger_1 ->
    +   * Stream 2 ->
    +   * StreamMerger_Final -> Out
    +   * Stream 3 ->
    +   * StreamMerger_2 ->
    +   * Stream 4 ->
    +   *
    +   * This function updates the provided DAG with the relevant streams.
    +   */
    +  public void mergeStreams(DAG dag, Configuration conf)
    +  {
    +    if (streamsToMerge.size() < 2) {
    +      throw new IllegalArgumentException("Not enough streams to merge, at least two streams must be selected for " +
    +          "merging with `.merge()`.");
    +    }
    +
    +    ArrayList<Stream> streamsToAddToDag = new ArrayList<>();
    +    ArrayList<NamedMerger> operatorsToAdd = new ArrayList<>();
    +
    +    // Determine operators and streams to add to the DAG
    +    constructMergeTree(streamsToAddToDag, operatorsToAdd);
    +
    +    for (NamedMerger m : operatorsToAdd) {
    +      dag.addOperator(m.name, m.merger);
    +    }
    +
    +    for (Stream s : streamsToAddToDag) {
    +      dag.addStream(s.name, s.sourcePort, s.destPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
    +    }
    +  }
    +
    +  /**
    +   * Given a set of streams to be merged (defined via {@link #merge(DefaultOutputPort)}), compute the optimal
    +   * structure of cascading mergers that need to be instantiated, added to the dag, and linked together.
    +   * @param streamsToAddToDag - (output)  A list that is populated with streams that should be added to the  DAG
    +   * @param operatorsToAdd - (output) A list that is populated with operators to be added to the DAG
    +   */
    +  public void constructMergeTree(
    +      ArrayList<Stream> streamsToAddToDag,
    +      ArrayList<NamedMerger> operatorsToAdd)
    +  {
    +    if (streamsToMerge.size() < 2) {
    +      throw new IllegalArgumentException("Not enough streams to merge. Ensure `.merge` was called for each stream " +
    +          "to be added.");
    +    }
    +
    +    // Define the final merger in the sequence and connect its output to the module's output
    +    StreamMerger<K> finalMerger = new StreamMerger<>();
    +    operatorsToAdd.add(new NamedMerger("Merger_Final", finalMerger));
    +    streamOutput.set(finalMerger.out);
    +
    +    ArrayList<ArrayList<StreamMerger<K>>> mergers = new ArrayList<>();
    +
    +    /**
    +     * First, calculate the number of tiers we need to merge all streams given that each merger can only merge two
    +     * streams at a time.
    +     */
    +    int numTiers = (int)Math.ceil(Math.log(streamsToMerge.size()) / Math.log(2));
    +
    +    // Handle the simple case where we only have a single tier (only two streams to merge)
    +    if (numTiers == 1) {
    +      assert (streamsToMerge.size() == 2);
    +      streamsToAddToDag.add(new Stream("FinalMerge_Stream_0", streamsToMerge.get(0), finalMerger.data1));
    +      streamsToAddToDag.add(new Stream("FinalMerge_Stream_1", streamsToMerge.get(1), finalMerger.data2));
    +
    +      // We don't need to add any operators since we've already added the final merger
    +    } else {
    +      Iterator<DefaultOutputPort<K>> streams = streamsToMerge.iterator();
    +
    +      // When assigning streams, we will switch between ports 1 and 2 as we use successive mergers.
    +      boolean usePort1;
    +
    +      // For each tier, create the mergers in that tier, and connect the relevant streams
    +      for (int i = 0; i < numTiers - 1; i++) {
    +        int streamIdx = 0;
    +        usePort1 = true;
    +
    +        int numMergers = (int)Math.ceil(streamsToMerge.size() / Math.pow(2, i + 1));
    --- End diff --
    
    Should this be floor instead of ceil? The one odd stream can be directly fed to the final tier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---