You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ka...@apache.org on 2018/04/03 18:54:47 UTC
[incubator-heron] branch master updated: While emitting in spout,
adhere to the batch size limit (#2798)
This is an automated email from the ASF dual-hosted git repository.
karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 1c47ef8 While emitting in spout, adhere to the batch size limit (#2798)
1c47ef8 is described below
commit 1c47ef86ba57eb612cf965285211dae1a3ce3196
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Apr 3 11:54:44 2018 -0700
While emitting in spout, adhere to the batch size limit (#2798)
* While emitting in spout, adhere to the batch size limit
* Added accessor
* Added missing imports
* Make sure that C++ instance also adheres to batch emit size constraint
---
heron/common/src/cpp/config/heron-internals-config-reader.cpp | 5 +++++
heron/common/src/cpp/config/heron-internals-config-reader.h | 3 +++
heron/common/src/cpp/config/heron-internals-config-vars.cpp | 2 ++
heron/common/src/cpp/config/heron-internals-config-vars.h | 3 +++
heron/instance/src/cpp/slave/outgoing-tuple-collection.h | 2 +-
heron/instance/src/cpp/spoutimpl/spout-instance.cpp | 8 +++++++-
heron/instance/src/cpp/spoutimpl/spout-instance.h | 2 ++
heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h | 1 +
.../java/com/twitter/heron/instance/AbstractOutputCollector.java | 7 +++++++
.../src/java/com/twitter/heron/instance/spout/SpoutInstance.java | 8 ++++++++
10 files changed, 39 insertions(+), 2 deletions(-)
diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
index 05c4b49..174dcb8 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
@@ -317,6 +317,11 @@ int HeronInternalsConfigReader::GetHeronInstanceEmitBatchTimeMs() {
.as<int>();
}
+int HeronInternalsConfigReader::GetHeronInstanceEmitBatchSize() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_EMIT_BATCH_SIZE]
+ .as<int>();
+}
+
int HeronInternalsConfigReader::GetHeronInstanceSetDataTupleCapacity() {
return config_[HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY]
.as<int>();
diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h
index 696c58f..d5a34a1 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.h
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.h
@@ -216,6 +216,9 @@ class HeronInternalsConfigReader : public YamlFileReader {
// The maximum time in ms for an spout instance to emit tuples per attempt
int GetHeronInstanceEmitBatchTimeMs();
+ // The maximum number of bytes for an spout instance to emit tuples per attempt
+ int GetHeronInstanceEmitBatchSize();
+
// The maximum # of data tuple to batch in a HeronDataTupleSet protobuf
int GetHeronInstanceSetDataTupleCapacity();
diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.cpp b/heron/common/src/cpp/config/heron-internals-config-vars.cpp
index 373ba4a..0906eaa 100644
--- a/heron/common/src/cpp/config/heron-internals-config-vars.cpp
+++ b/heron/common/src/cpp/config/heron-internals-config-vars.cpp
@@ -128,6 +128,8 @@ const sp_string HeronInternalsConfigVars::HERON_INSTANCE_INTERNAL_SPOUT_WRITE_QU
"heron.instance.internal.spout.write.queue.capacity";
const sp_string HeronInternalsConfigVars::HERON_INSTANCE_EMIT_BATCH_TIME_MS =
"heron.instance.emit.batch.time.ms";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_EMIT_BATCH_SIZE =
+ "heron.instance.emit.batch.size.bytes";
const sp_string HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY =
"heron.instance.set.data.tuple.capacity";
const sp_string HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_SIZE_BYTES =
diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h
index bcab89a..5ac3752 100644
--- a/heron/common/src/cpp/config/heron-internals-config-vars.h
+++ b/heron/common/src/cpp/config/heron-internals-config-vars.h
@@ -206,6 +206,9 @@ class HeronInternalsConfigVars {
// The maximum time in ms for an spout instance to emit tuples per attempt
static const sp_string HERON_INSTANCE_EMIT_BATCH_TIME_MS;
+ // The maximum number of bytes for n spout instance to emit tuples per attempt
+ static const sp_string HERON_INSTANCE_EMIT_BATCH_SIZE;
+
// The maximum # of data tuple to batch in a HeronDataTupleSet protobuf
static const sp_string HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY;
diff --git a/heron/instance/src/cpp/slave/outgoing-tuple-collection.h b/heron/instance/src/cpp/slave/outgoing-tuple-collection.h
index b9d722b..0acc76b 100644
--- a/heron/instance/src/cpp/slave/outgoing-tuple-collection.h
+++ b/heron/instance/src/cpp/slave/outgoing-tuple-collection.h
@@ -43,12 +43,12 @@ class OutgoingTupleCollection {
int tupleSize);
int64_t getTotalDataTuplesEmitted() const;
+ int64_t getTotalDataSizeEmitted() const;
private:
void initNewDataTuple(const std::string& streamId);
void initNewControlTuple();
void flushRemaining();
- int64_t getTotalDataSizeEmitted() const;
std::string componentName_;
NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave_;
diff --git a/heron/instance/src/cpp/spoutimpl/spout-instance.cpp b/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
index 422776a..ed36430 100644
--- a/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
+++ b/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
@@ -40,6 +40,8 @@ SpoutInstance::SpoutInstance(EventLoop* eventLoop,
->GetHeronInstanceInternalSpoutWriteQueueCapacity();
maxEmitBatchIntervalMs_ = config::HeronInternalsConfigReader::Instance()
->GetHeronInstanceEmitBatchTimeMs();
+ maxEmitBatchSize_ = config::HeronInternalsConfigReader::Instance()
+ ->GetHeronInstanceEmitBatchSize();
ackingEnabled_ = taskContext->isAckingEnabled();
enableMessageTimeouts_ = taskContext->enableMessageTimeouts();
lookForTimeoutsTimer_ = -1;
@@ -61,7 +63,8 @@ SpoutInstance::SpoutInstance(EventLoop* eventLoop,
collector_.reset(new SpoutOutputCollectorImpl(serializer_, taskContext_, dataFromSlave_));
LOG(INFO) << "Instantiated spout for component " << taskContext->getThisComponentName()
<< " with task_id " << taskContext->getThisTaskId() << " and maxWriteBufferSize_ "
- << maxWriteBufferSize_ << " and maxEmitBatchIntervalMs " << maxEmitBatchIntervalMs_;
+ << maxWriteBufferSize_ << " and maxEmitBatchIntervalMs " << maxEmitBatchIntervalMs_
+ << " and maxEmitBatchSize " << maxEmitBatchSize_;
}
SpoutInstance::~SpoutInstance() {
@@ -131,6 +134,7 @@ void SpoutInstance::produceTuple() {
int maxSpoutPending = atoi(taskContext_->getConfig()
->get(api::config::Config::TOPOLOGY_MAX_SPOUT_PENDING).c_str());
int64_t totalTuplesEmitted = collector_->getTotalDataTuplesEmitted();
+ int64_t totalBytesEmitted = collector_->getTotalDataBytesEmitted();
int64_t startTime = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
int64_t startOfCall = startTime;
@@ -141,9 +145,11 @@ void SpoutInstance::produceTuple() {
std::chrono::system_clock::now().time_since_epoch()).count();
metrics_->nextTuple(currentTime - startOfCall);
int64_t newTotalTuplesEmitted = collector_->getTotalDataTuplesEmitted();
+ int64_t newTotalBytesEmitted = collector_->getTotalDataBytesEmitted();
if (newTotalTuplesEmitted == totalTuplesEmitted) break;
totalTuplesEmitted = newTotalTuplesEmitted;
if (currentTime - startTime > maxEmitBatchIntervalMs_ * 1000 * 1000) break;
+ if (newTotalBytesEmitted - totalBytesEmitted > maxEmitBatchSize_) break;
startOfCall = currentTime;
}
}
diff --git a/heron/instance/src/cpp/spoutimpl/spout-instance.h b/heron/instance/src/cpp/spoutimpl/spout-instance.h
index 462e14a..e83498e 100644
--- a/heron/instance/src/cpp/spoutimpl/spout-instance.h
+++ b/heron/instance/src/cpp/spoutimpl/spout-instance.h
@@ -74,6 +74,8 @@ class SpoutInstance : public InstanceBase {
int maxWriteBufferSize_;
// This is the max time to spend in emitting tuple in one go
int maxEmitBatchIntervalMs_;
+ // This is the max number of bytes to emit in one go
+ int maxEmitBatchSize_;
};
} // namespace instance
diff --git a/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h b/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h
index 7d19433..da953e6 100644
--- a/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h
+++ b/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h
@@ -47,6 +47,7 @@ class SpoutOutputCollectorImpl : public api::spout::ISpoutOutputCollector {
virtual void reportError(std::exception& except);
int64_t getTotalDataTuplesEmitted() const { return collector_->getTotalDataTuplesEmitted(); }
+ int64_t getTotalDataBytesEmitted() const { return collector_->getTotalDataSizeEmitted(); }
int64_t numInFlight() const { return inflightTuples_.size(); }
int getImmediateAcksSize() const { return immediateAcks_.size(); }
std::shared_ptr<RootTupleInfo> getImmediateAcksFront() {
diff --git a/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java
index b7ec1e7..0db45df 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java
@@ -38,6 +38,7 @@ public class AbstractOutputCollector {
protected final ComponentMetrics metrics;
protected final boolean ackEnabled;
private long totalTuplesEmitted;
+ private long totalBytesEmitted;
private PhysicalPlanHelper helper;
/**
@@ -52,6 +53,7 @@ public class AbstractOutputCollector {
this.serializer = serializer;
this.metrics = metrics;
this.totalTuplesEmitted = 0;
+ this.totalBytesEmitted = 0;
this.helper = helper;
Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
@@ -117,6 +119,10 @@ public class AbstractOutputCollector {
return totalTuplesEmitted;
}
+ public long getTotalBytesEmitted() {
+ return totalBytesEmitted;
+ }
+
protected HeronTuples.HeronDataTuple.Builder initTupleBuilder(String streamId,
List<Object> tuple,
Integer emitDirectTaskId) {
@@ -168,6 +174,7 @@ public class AbstractOutputCollector {
// submit to outputter
outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
totalTuplesEmitted++;
+ totalBytesEmitted += tupleSizeInBytes;
// Update metrics
metrics.emittedTuple(streamId);
diff --git a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
index 100d293..28e1161 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
@@ -33,6 +33,7 @@ import com.twitter.heron.api.state.State;
import com.twitter.heron.api.topology.IStatefulComponent;
import com.twitter.heron.api.topology.IUpdatable;
import com.twitter.heron.api.utils.Utils;
+import com.twitter.heron.common.basics.ByteAmount;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.basics.SlaveLooper;
@@ -306,8 +307,10 @@ public class SpoutInstance implements IInstance {
int maxSpoutPending = TypeUtils.getInteger(config.get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
long totalTuplesEmitted = collector.getTotalTuplesEmitted();
+ long totalBytesEmitted = collector.getTotalBytesEmitted();
Duration instanceEmitBatchTime = systemConfig.getInstanceEmitBatchTime();
+ ByteAmount instanceEmitBatchSize = systemConfig.getInstanceEmitBatchSize();
long startOfCycle = System.nanoTime();
@@ -326,6 +329,7 @@ public class SpoutInstance implements IInstance {
spoutMetrics.nextTuple(latency);
long newTotalTuplesEmitted = collector.getTotalTuplesEmitted();
+ long newTotalBytesEmitted = collector.getTotalBytesEmitted();
if (newTotalTuplesEmitted == totalTuplesEmitted) {
// No tuples to emit....
break;
@@ -337,6 +341,10 @@ public class SpoutInstance implements IInstance {
if (currentTime - startOfCycle - instanceEmitBatchTime.toNanos() > 0) {
break;
}
+ if (!ByteAmount.fromBytes(newTotalBytesEmitted - totalBytesEmitted)
+ .lessThan(instanceEmitBatchSize)) {
+ break;
+ }
}
}
--
To stop receiving notification emails like this one, please contact
karthikz@apache.org.