You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by df...@apache.org on 2013/03/07 09:29:01 UTC

[2/2] git commit: Merge branch 'S4-102' into dev

Updated Branches:
  refs/heads/dev 0f037e5b3 -> bac9cb201


Merge branch 'S4-102' into dev

Conflicts:
	subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
	subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
	subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
	subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/bac9cb20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/bac9cb20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/bac9cb20

Branch: refs/heads/dev
Commit: bac9cb2017d600e2f5d982c913fc4f41cf7c4204
Parents: 0f037e5 bbbaba4
Author: Daniel Gómez Ferro <df...@apache.org>
Authored: Thu Mar 7 10:28:35 2013 +0100
Committer: Daniel Gómez Ferro <df...@apache.org>
Committed: Thu Mar 7 10:28:35 2013 +0100

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/DefaultCommModule.java |    3 +-
 .../src/main/java/org/apache/s4/core/App.java      |   25 +++++++++++++-
 .../java/org/apache/s4/core/DefaultCoreModule.java |    3 +-
 .../main/java/org/apache/s4/core/SenderImpl.java   |   18 +++++++++-
 .../org/apache/s4/fixtures/MockCoreModule.java     |    1 -
 .../org/apache/s4/wordcount/WordCounterPE.java     |   26 +++++++++------
 .../s4/example/twitter/TwitterCounterApp.java      |    2 +-
 7 files changed, 60 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bac9cb20/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bac9cb20/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index ccb02e8,673d0bb..b3c0586
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@@ -26,13 -26,11 +26,14 @@@ import java.util.concurrent.TimeUnit
  import org.apache.s4.base.Event;
  import org.apache.s4.base.Hasher;
  import org.apache.s4.base.KeyFinder;
 +import org.apache.s4.base.Sender;
  import org.apache.s4.base.SerializerDeserializer;
 -import org.apache.s4.comm.serialize.KryoSerDeser;
 +import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+ import org.apache.s4.comm.topology.Cluster;
  import org.apache.s4.comm.topology.RemoteStreams;
  import org.apache.s4.core.ft.CheckpointingFramework;
 +import org.apache.s4.core.staging.StreamExecutorServiceFactory;
 +import org.apache.s4.core.util.S4Metrics;
  import org.apache.s4.core.window.AbstractSlidingWindowPE;
  import org.apache.s4.core.window.SlotFactory;
  import org.slf4j.Logger;
@@@ -68,9 -66,12 +69,12 @@@ public abstract class App 
      @Inject
      private Sender sender;
      @Inject
 -    private Receiver receiver;
 +    private ReceiverImpl receiver;
  
      @Inject
+     private Cluster cluster;
+ 
+     @Inject
      private RemoteSenders remoteSenders;
  
      @Inject

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bac9cb20/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bac9cb20/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index b103ad8,0000000..70e159a
mode 100644,000000..100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@@ -1,178 -1,0 +1,192 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.s4.core;
 +
 +import java.nio.ByteBuffer;
 +import java.util.concurrent.ExecutorService;
 +
 +import org.apache.s4.base.Emitter;
 +import org.apache.s4.base.Event;
 +import org.apache.s4.base.Hasher;
 +import org.apache.s4.base.Sender;
 +import org.apache.s4.base.SerializerDeserializer;
 +import org.apache.s4.comm.topology.Assignment;
++import org.apache.s4.comm.topology.Cluster;
++import org.apache.s4.comm.topology.ClusterChangeListener;
 +import org.apache.s4.comm.topology.ClusterNode;
 +import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 +import org.apache.s4.core.util.S4Metrics;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.inject.Inject;
 +
 +/**
 + * The {@link SenderImpl} and its counterpart {@link ReceiverImpl} are the top level classes of the communication layer.
 + * <p>
 + * {@link SenderImpl} is responsible for sending an event to a {@link ProcessingElement} instance using a hashKey.
 + * <p>
 + * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
 + * from the application developer.
 + */
- public class SenderImpl implements Sender {
++public class SenderImpl implements Sender, ClusterChangeListener {
 +
 +    private static Logger logger = LoggerFactory.getLogger(SenderImpl.class);
 +
 +    final private Emitter emitter;
 +    final private SerializerDeserializer serDeser;
 +    final private Hasher hasher;
++    private Cluster cluster;
 +
 +    Assignment assignment;
 +    private int localPartitionId = -1;
 +
 +    private final ExecutorService tpe;
 +
 +    @Inject
 +    S4Metrics metrics;
 +
 +    /**
 +     * 
 +     * @param emitter
 +     *            the emitter implements the low level communication layer.
 +     * @param serDeser
 +     *            a serialization mechanism.
 +     * @param hasher
 +     *            a hashing function to map keys to partition IDs.
 +     */
 +    @Inject
 +    public SenderImpl(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
-             SenderExecutorServiceFactory senderExecutorServiceFactory) {
++            SenderExecutorServiceFactory senderExecutorServiceFactory, Cluster cluster) {
 +        this.emitter = emitter;
 +        this.serDeser = serDeser;
 +        this.hasher = hasher;
 +        this.assignment = assignment;
 +        this.tpe = senderExecutorServiceFactory.create();
++        this.cluster = cluster;
 +    }
 +
 +    @Inject
++    private void init() {
++        cluster.addListener(this);
++        resolveLocalPartitionId();
++    }
++
 +    private void resolveLocalPartitionId() {
 +        ClusterNode node = assignment.assignClusterNode();
 +        if (node != null) {
 +            localPartitionId = node.getPartition();
 +        }
 +    }
 +
 +    /*
 +     * (non-Javadoc)
 +     * 
 +     * @see org.apache.s4.core.Sender#checkAndSendIfNotLocal(java.lang.String, org.apache.s4.base.Event)
 +     */
 +    @Override
 +    public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
 +        int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
 +        if (partition == localPartitionId) {
 +            metrics.sentLocal();
 +            /* Hey we are in the same JVM, don't use the network. */
 +            return false;
 +        }
 +        send(partition, event);
 +        metrics.sentEvent(partition);
 +        return true;
 +    }
 +
 +    private void send(int partition, Event event) {
 +        tpe.submit(new SerializeAndSendToRemotePartitionTask(event, partition));
 +    }
 +
 +    /*
 +     * (non-Javadoc)
 +     * 
 +     * @see org.apache.s4.core.Sender#sendToRemotePartitions(org.apache.s4.base.Event)
 +     */
 +    @Override
 +    public void sendToAllRemotePartitions(Event event) {
 +        tpe.submit(new SerializeAndSendToAllRemotePartitionsTask(event));
 +
 +    }
 +
 +    class SerializeAndSendToRemotePartitionTask implements Runnable {
 +        Event event;
 +        int remotePartitionId;
 +
 +        public SerializeAndSendToRemotePartitionTask(Event event, int remotePartitionId) {
 +            this.event = event;
 +            this.remotePartitionId = remotePartitionId;
 +        }
 +
 +        @Override
 +        public void run() {
 +            ByteBuffer serializedEvent = serDeser.serialize(event);
 +            try {
 +                emitter.send(remotePartitionId, serializedEvent);
 +            } catch (InterruptedException e) {
 +                logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
 +                Thread.currentThread().interrupt();
 +            }
 +
 +        }
 +
 +    }
 +
 +    class SerializeAndSendToAllRemotePartitionsTask implements Runnable {
 +
 +        Event event;
 +
 +        public SerializeAndSendToAllRemotePartitionsTask(Event event) {
 +            super();
 +            this.event = event;
 +        }
 +
 +        @Override
 +        public void run() {
 +            ByteBuffer serializedEvent = serDeser.serialize(event);
 +
 +            for (int i = 0; i < emitter.getPartitionCount(); i++) {
 +
 +                /* Don't use the comm layer when we send to the same partition. */
 +                if (localPartitionId != i) {
 +                    try {
 +                        emitter.send(i, serializedEvent);
 +                        metrics.sentEvent(i);
 +                    } catch (InterruptedException e) {
 +                        logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
 +                        // no reason to continue: we were interrupted, so we reset the interrupt status and leave
 +                        Thread.currentThread().interrupt();
 +                        break;
 +                    }
 +                }
 +            }
 +
 +        }
 +
 +    }
 +
++    @Override
++    public void onChange() {
++        resolveLocalPartitionId();
++    }
++
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bac9cb20/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index 7bacf3b,6a60ec5..29a43e8
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@@ -47,22 -45,12 +47,21 @@@ public class MockCoreModule extends Abs
  
      @Override
      protected void configure() {
 -        bind(DeploymentManager.class).to(NoOpDeploymentManager.class);
 -        bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
 -        bind(Listener.class).toInstance(Mockito.mock(Listener.class));
 -        bind(Receiver.class).toInstance(Mockito.mock(Receiver.class));
 -        Cluster clusterMock = Mockito.mock(Cluster.class);
 -        Mockito.when(clusterMock.getPhysicalCluster()).thenReturn(new PhysicalCluster(1));
 -        bind(Cluster.class).toInstance(clusterMock);
 +        // Although we want to mock as much as possible, most tests still require the machinery for routing events
 +        // within a stream/node, therefore sender and stream executors are not mocked
 +
 +        // NOTE: we use a blocking executor so that events don't get dropped in simple tests
 +        bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
 +
 +        bind(SenderExecutorServiceFactory.class).to(BlockingSenderExecutorServiceFactory.class);
 +        bind(DeserializerExecutorFactory.class).to(MemoryAwareDeserializerExecutorFactory.class);
 +
 +        bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
 +        bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
 +
 +        bind(Integer.class).annotatedWith(Names.named("s4.sender.parallelism")).toInstance(8);
 +        bind(Integer.class).annotatedWith(Names.named("s4.sender.workQueueSize")).toInstance(10000);
 +
 +        bind(Integer.class).annotatedWith(Names.named("s4.stream.workQueueSize")).toInstance(10000);
- 
      }
  }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bac9cb20/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
----------------------------------------------------------------------
diff --cc test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
index 456305a,5d7855f..50ee64f
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@@ -85,19 -77,6 +85,19 @@@ public class TwitterCounterApp extends 
          }
      }
  
 +    private void prepareMetricsOutputs() throws IOException {
-         File metricsDirForPartition = new File("metrics/" + getReceiver().getPartitionId());
++        File metricsDirForPartition = new File("metrics/" + getPartitionId());
 +        if (metricsDirForPartition.exists()) {
 +            FileUtils.deleteDirectory(metricsDirForPartition);
 +        }
 +        // activate metrics csv dump
 +        if (!metricsDirForPartition.mkdirs()) {
 +            LoggerFactory.getLogger(getClass()).error("Cannot create directory {}",
 +                    new File("metrics").getAbsolutePath());
 +        }
 +        CsvReporter.enable(metricsDirForPartition, 10, TimeUnit.SECONDS);
 +    }
 +
      @Override
      protected void onStart() {