You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/10/17 01:00:58 UTC

samza git commit: SAMZA-1959: Modify EmbeddedTaggedRateLimiter to use total number of tasks for effective rate calculation

Repository: samza
Updated Branches:
  refs/heads/master 947c2f0ef -> 4d6ff989b


SAMZA-1959: Modify EmbeddedTaggedRateLimiter to use total number of tasks for effective rate calculation

The EmbeddedTaggedRateLimiter currently uses number of tasks assigned to the currently container to calculate effective rate, which is inaccurate. With latest API refactoring, now it can take the total number of tasks.

Author: Wei Song <ws...@linkedin.com>

Reviewers: Peng Du <pd...@linkedin.com>

Closes #735 from weisong44/SAMZA-1959 and squashes the following commits:

ae0a0dd9 [Wei Song] SAMZA-1959: Modify EmbeddedTaggedRateLimiter to use total number of tasks for effective rate calculation
a06e8ec2 [Wei Song] Merge remote-tracking branch 'upstream/master'
2c679c39 [Wei Song] Merge remote-tracking branch 'upstream/master'
a56c28dc [Wei Song] Merge remote-tracking branch 'upstream/master'
097958c8 [Wei Song] Merge remote-tracking branch 'upstream/master'
05822f0a [Wei Song] Merge remote-tracking branch 'upstream/master'
f7480505 [Wei Song] Merge remote-tracking branch 'upstream/master'
7706ab1f [Wei Song] Merge remote-tracking branch 'upstream/master'
f5731b10 [Wei Song] Merge remote-tracking branch 'upstream/master'
1e5de45a [Wei Song] Merge remote-tracking branch 'upstream/master'
c85604e0 [Wei Song] Merge remote-tracking branch 'upstream/master'
242d8442 [Wei Song] Merge remote-tracking branch 'upstream/master'
ec7d8409 [Wei Song] Merge remote-tracking branch 'upstream/master'
e19b4dc9 [Wei Song] Merge remote-tracking branch 'upstream/master'
8ee78441 [Wei Song] Merge remote-tracking branch 'upstream/master'
1c6a2eae [Wei Song] Merge remote-tracking branch 'upstream/master'
a6c94add [Wei Song] Merge remote-tracking branch 'upstream/master'
41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master'
239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master'
eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master'
51562391 [Wei Song] Merge remote-tracking branch 'upstream/master'
de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master'
df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master'
f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master'
4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master'
0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master'
aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master'
a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master'
5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master'
3f7ed71f [Wei Song] Added self to committer list


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4d6ff989
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4d6ff989
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4d6ff989

Branch: refs/heads/master
Commit: 4d6ff989bcd284c2231ffcbba1bf9e1f5dbe7d7f
Parents: 947c2f0
Author: Wei Song <ws...@linkedin.com>
Authored: Tue Oct 16 18:00:51 2018 -0700
Committer: Wei Song <ws...@linkedin.com>
Committed: Tue Oct 16 18:00:51 2018 -0700

----------------------------------------------------------------------
 .../samza/util/EmbeddedTaggedRateLimiter.java   |  9 +++++--
 .../descriptors/TestRemoteTableDescriptor.java  | 26 ++++++++++++++------
 .../util/TestEmbeddedTaggedRateLimiter.java     | 18 +++++++++-----
 3 files changed, 38 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4d6ff989/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
index a91d663..2bbbf8d 100644
--- a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
+++ b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
@@ -23,6 +23,8 @@ import com.google.common.base.Stopwatch;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContextImpl;
+import org.apache.samza.job.model.JobModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,8 +111,11 @@ public class EmbeddedTaggedRateLimiter implements RateLimiter {
     this.tagToRateLimiterMap = Collections.unmodifiableMap(tagToTargetRateMap.entrySet().stream()
         .map(e -> {
             String tag = e.getKey();
-            int numTasksInContainer = context.getContainerContext().getContainerModel().getTasks().keySet().size();
-            int effectiveRate = e.getValue() / numTasksInContainer;
+            JobModel jobModel = ((TaskContextImpl) context.getTaskContext()).getJobModel();
+            int numTasks = jobModel.getContainers().values().stream()
+                .mapToInt(cm -> cm.getTasks().size())
+                .sum();
+            int effectiveRate = e.getValue() / numTasks;
             TaskName taskName = context.getTaskContext().getTaskModel().getTaskName();
             LOGGER.info(String.format("Effective rate limit for task %s and tag %s is %d", taskName, tag,
                 effectiveRate));

http://git-wip-us.apache.org/repos/asf/samza/blob/4d6ff989/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index e2841b6..c6d969e 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -21,9 +21,11 @@ package org.apache.samza.table.remote.descriptors;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
-import org.apache.samza.context.MockContext;
+import org.apache.samza.context.TaskContextImpl;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -48,7 +50,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -124,12 +125,15 @@ public class TestRemoteTableDescriptor {
   }
 
   private Context createMockContext() {
-    Context context = new MockContext();
+    Context context = mock(Context.class);
+
+    TaskContextImpl taskContext = mock(TaskContextImpl.class);
+    when(context.getTaskContext()).thenReturn(taskContext);
 
     MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
-    doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(Matchers.anyString(), Matchers.anyString());
-    doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(Matchers.anyString(), Matchers.anyString());
-    doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
+    when(metricsRegistry.newTimer(Matchers.anyString(), Matchers.anyString())).thenReturn(mock(Timer.class));
+    when(metricsRegistry.newCounter(Matchers.anyString(), Matchers.anyString())).thenReturn(mock(Counter.class));
+    when(taskContext.getTaskMetricsRegistry()).thenReturn(metricsRegistry);
 
     TaskName taskName = new TaskName("MyTask");
     TaskModel taskModel = mock(TaskModel.class);
@@ -138,7 +142,15 @@ public class TestRemoteTableDescriptor {
 
     ContainerModel containerModel = mock(ContainerModel.class);
     when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, taskModel));
-    when(context.getContainerContext().getContainerModel()).thenReturn(containerModel);
+
+    ContainerContext containerContext = mock(ContainerContext.class);
+    when(containerContext.getContainerModel()).thenReturn(containerModel);
+    when(context.getContainerContext()).thenReturn(containerContext);
+
+    String containerId = "container-1";
+    JobModel jobModel = mock(JobModel.class);
+    when(taskContext.getJobModel()).thenReturn(jobModel);
+    when(jobModel.getContainers()).thenReturn(ImmutableMap.of(containerId, containerModel));
 
     return context;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4d6ff989/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
index 8559bb3..3b11765 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
@@ -18,11 +18,11 @@
  */
 package org.apache.samza.util;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.context.Context;
-import org.apache.samza.context.MockContext;
+import org.apache.samza.context.TaskContextImpl;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -209,14 +209,20 @@ public class TestEmbeddedTaggedRateLimiter {
   }
 
   static void initRateLimiter(RateLimiter rateLimiter) {
-    Context context = new MockContext(mock(Config.class));
-    when(context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class));
-    ContainerModel containerModel = mock(ContainerModel.class);
     Map<TaskName, TaskModel> tasks = IntStream.range(0, NUMBER_OF_TASKS)
         .mapToObj(i -> new TaskName("task-" + i))
         .collect(Collectors.toMap(Function.identity(), x -> mock(TaskModel.class)));
+    ContainerModel containerModel = mock(ContainerModel.class);
     when(containerModel.getTasks()).thenReturn(tasks);
-    when(context.getContainerContext().getContainerModel()).thenReturn(containerModel);
+    JobModel jobModel = mock(JobModel.class);
+    Map<String, ContainerModel> containerModelMap = new HashMap<>();
+    containerModelMap.put("container-1", containerModel);
+    when(jobModel.getContainers()).thenReturn(containerModelMap);
+    Context context = mock(Context.class);
+    TaskContextImpl taskContext = mock(TaskContextImpl.class);
+    when(context.getTaskContext()).thenReturn(taskContext);
+    when(taskContext.getJobModel()).thenReturn(jobModel);
+    when(context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class));
     rateLimiter.init(context);
   }
 }