You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/14 00:22:43 UTC
[1/2] incubator-beam git commit: KafkaIO reader should set
consumedOffset even before reading the first record.
Repository: incubator-beam
Updated Branches:
refs/heads/master 3c731707b -> 93d2e374c
KafkaIO reader should set consumedOffset even before reading the first record.
* Distinguish between uninitialized consumed offset and consumed offset for
an empty partition.
* Add test to verify we handle empty partitions better.
* Fix how we were using MockConsumer. checkpoint test was actually doing what it was supposed to.
* Avoid cpu spinning in case of a test failure
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08c2f1c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08c2f1c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08c2f1c3
Branch: refs/heads/master
Commit: 08c2f1c361eb17f47794e805df910c3dad6a9d43
Parents: 3c73170
Author: Raghu Angadi <ra...@google.com>
Authored: Fri Oct 7 17:18:25 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Oct 13 17:21:52 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/kafka/KafkaCheckpointMark.java | 8 +-
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 60 +++----
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 162 +++++++++++++++----
3 files changed, 165 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08c2f1c3/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 664bb6f..4f9e96f 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -56,19 +56,19 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Seri
*/
public static class PartitionMark implements Serializable {
private final TopicPartition topicPartition;
- private final long offset;
+ private final long nextOffset;
public PartitionMark(TopicPartition topicPartition, long offset) {
this.topicPartition = topicPartition;
- this.offset = offset;
+ this.nextOffset = offset;
}
public TopicPartition getTopicPartition() {
return topicPartition;
}
- public long getOffset() {
- return offset;
+ public long getNextOffset() {
+ return nextOffset;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08c2f1c3/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e26f7c5..2030789 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -782,6 +782,8 @@ public class KafkaIO {
Executors.newSingleThreadScheduledExecutor();
private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5;
+ private static final long UNINITIALIZED_OFFSET = -1;
+
/** watermark before any records have been read. */
private static Instant initialWatermark = new Instant(Long.MIN_VALUE);
@@ -792,7 +794,7 @@ public class KafkaIO {
// maintains state of each assigned partition (buffered records, consumed offset, etc)
private static class PartitionState {
private final TopicPartition topicPartition;
- private long consumedOffset;
+ private long nextOffset;
private long latestOffset;
private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
@@ -800,15 +802,15 @@ public class KafkaIO {
private double avgRecordSize = 0;
private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements
- PartitionState(TopicPartition partition, long offset) {
+ PartitionState(TopicPartition partition, long nextOffset) {
this.topicPartition = partition;
- this.consumedOffset = offset;
- this.latestOffset = -1;
+ this.nextOffset = nextOffset;
+ this.latestOffset = UNINITIALIZED_OFFSET;
}
// update consumedOffset and avgRecordSize
void recordConsumed(long offset, int size) {
- consumedOffset = offset;
+ nextOffset = offset + 1;
// this is always updated from single thread. probably not worth making it an AtomicDouble
if (avgRecordSize <= 0) {
@@ -825,14 +827,10 @@ public class KafkaIO {
synchronized long approxBacklogInBytes() {
// Note that is an an estimate of uncompressed backlog.
- // Messages on Kafka might be comressed.
- if (latestOffset < 0 || consumedOffset < 0) {
+ if (latestOffset < 0 || nextOffset < 0) {
return UnboundedReader.BACKLOG_UNKNOWN;
}
- if (latestOffset <= consumedOffset || consumedOffset < 0) {
- return 0;
- }
- return (long) ((latestOffset - consumedOffset - 1) * avgRecordSize);
+ return Math.max(0, (long) ((latestOffset - nextOffset) * avgRecordSize));
}
}
@@ -846,7 +844,7 @@ public class KafkaIO {
partitionStates = ImmutableList.copyOf(Lists.transform(source.assignedPartitions,
new Function<TopicPartition, PartitionState>() {
public PartitionState apply(TopicPartition tp) {
- return new PartitionState(tp, -1L);
+ return new PartitionState(tp, UNINITIALIZED_OFFSET);
}
}));
@@ -866,7 +864,7 @@ public class KafkaIO {
"checkpointed partition %s and assigned partition %s don't match",
ckptMark.getTopicPartition(), assigned);
- partitionStates.get(i).consumedOffset = ckptMark.getOffset();
+ partitionStates.get(i).nextOffset = ckptMark.getNextOffset();
}
}
}
@@ -925,18 +923,22 @@ public class KafkaIO {
consumer = source.consumerFactoryFn.apply(source.consumerConfig);
consumer.assign(source.assignedPartitions);
- // seek to consumedOffset + 1 if it is set
for (PartitionState p : partitionStates) {
- if (p.consumedOffset >= 0) {
- LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1);
- consumer.seek(p.topicPartition, p.consumedOffset + 1);
+ if (p.nextOffset != UNINITIALIZED_OFFSET) {
+ consumer.seek(p.topicPartition, p.nextOffset);
} else {
- LOG.info("{}: resuming {} at default offset", name, p.topicPartition);
+ // nextOffset is unininitialized here, meaning start reading from latest record as of now
+ // ('latest' is the default, and is configurable). Remember the current position without
+ // waiting until the first record read. This ensures checkpoint is accurate even if the
+ // reader is closed before reading any records.
+ p.nextOffset = consumer.position(p.topicPartition);
}
+
+ LOG.info("{}: reading from {} starting at offset {}", name, p.topicPartition, p.nextOffset);
}
- // start consumer read loop.
- // Note that consumer is not thread safe, should not accessed out side consumerPollLoop()
+ // Start consumer read loop.
+ // Note that consumer is not thread safe, should not be accessed out side consumerPollLoop().
consumerPollThread.submit(
new Runnable() {
public void run() {
@@ -989,10 +991,10 @@ public class KafkaIO {
}
ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
- long consumed = pState.consumedOffset;
+ long expected = pState.nextOffset;
long offset = rawRecord.offset();
- if (consumed >= 0 && offset <= consumed) { // -- (a)
+ if (offset < expected) { // -- (a)
// this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
// should we check if the offset is way off from consumedOffset (say > 1M)?
LOG.warn("{}: ignoring already consumed offset {} for {}",
@@ -1001,9 +1003,9 @@ public class KafkaIO {
}
// sanity check
- if (consumed >= 0 && (offset - consumed) != 1) {
- LOG.warn("{}: gap in offsets for {} after {}. {} records missing.",
- this, pState.topicPartition, consumed, offset - consumed - 1);
+ if (offset != expected) {
+ LOG.warn("{}: gap in offsets for {} at {}. {} records missing.",
+ this, pState.topicPartition, expected, offset - expected);
}
if (curRecord == null) {
@@ -1059,11 +1061,11 @@ public class KafkaIO {
p.setLatestOffset(offset);
} catch (Exception e) {
LOG.warn("{}: exception while fetching latest offsets. ignored.", this, e);
- p.setLatestOffset(-1L); // reset
+ p.setLatestOffset(UNINITIALIZED_OFFSET); // reset
}
- LOG.debug("{}: latest offset update for {} : {} (consumed offset {}, avg record size {})",
- this, p.topicPartition, p.latestOffset, p.consumedOffset, p.avgRecordSize);
+ LOG.debug("{}: latest offset update for {} : {} (consumer offset {}, avg record size {})",
+ this, p.topicPartition, p.latestOffset, p.nextOffset, p.avgRecordSize);
}
LOG.debug("{}: backlog {}", this, getSplitBacklogBytes());
@@ -1086,7 +1088,7 @@ public class KafkaIO {
Lists.transform(partitionStates,
new Function<PartitionState, PartitionMark>() {
public PartitionMark apply(PartitionState p) {
- return new PartitionMark(p.topicPartition, p.consumedOffset);
+ return new PartitionMark(p.topicPartition, p.nextOffset);
}
}
)));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08c2f1c3/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 772efe1..67aa675 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
@@ -35,6 +36,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
@@ -69,6 +72,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
@@ -78,7 +82,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
- * Tests of {@link KafkaSource}.
+ * Tests of {@link KafkaIO}.
*/
@RunWith(JUnit4.class)
public class KafkaIOTest {
@@ -96,7 +100,8 @@ public class KafkaIOTest {
// Update mock consumer with records distributed among the given topics, each with given number
// of partitions. Records are assigned in round-robin order among the partitions.
private static MockConsumer<byte[], byte[]> mkMockConsumer(
- List<String> topics, int partitionsPerTopic, int numElements) {
+ List<String> topics, int partitionsPerTopic, int numElements,
+ OffsetResetStrategy offsetResetStrategy) {
final List<TopicPartition> partitions = new ArrayList<>();
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
@@ -105,8 +110,10 @@ public class KafkaIOTest {
for (String topic : topics) {
List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic);
for (int i = 0; i < partitionsPerTopic; i++) {
- partitions.add(new TopicPartition(topic, i));
+ TopicPartition tp = new TopicPartition(topic, i);
+ partitions.add(tp);
partIds.add(new PartitionInfo(topic, i, null, null, null));
+ records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>());
}
partitionMap.put(topic, partIds);
}
@@ -118,30 +125,28 @@ public class KafkaIOTest {
int pIdx = i % numPartitions;
TopicPartition tp = partitions.get(pIdx);
- if (!records.containsKey(tp)) {
- records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>());
- }
records.get(tp).add(
- new ConsumerRecord<byte[], byte[]>(
+ new ConsumerRecord<>(
tp.topic(),
tp.partition(),
offsets[pIdx]++,
- ByteBuffer.wrap(new byte[8]).putInt(i).array(), // key is 4 byte record id
+ ByteBuffer.wrap(new byte[4]).putInt(i).array(), // key is 4 byte record id
ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id
}
- MockConsumer<byte[], byte[]> consumer =
- new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
- // override assign() to add records that belong to the assigned partitions.
- public void assign(List<TopicPartition> assigned) {
+ // This is updated when reader assigns partitions.
+ final AtomicReference<List<TopicPartition>> assignedPartitions =
+ new AtomicReference<>(Collections.<TopicPartition>emptyList());
+
+ final MockConsumer<byte[], byte[]> consumer =
+ new MockConsumer<byte[], byte[]>(offsetResetStrategy) {
+ // override assign() in order to set offset limits & to save assigned partitions.
+ public void assign(final List<TopicPartition> assigned) {
super.assign(assigned);
+ assignedPartitions.set(ImmutableList.copyOf(assigned));
for (TopicPartition tp : assigned) {
- for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
- addRecord(r);
- }
updateBeginningOffsets(ImmutableMap.of(tp, 0L));
updateEndOffsets(ImmutableMap.of(tp, (long) records.get(tp).size()));
- seek(tp, 0);
}
}
};
@@ -150,6 +155,29 @@ public class KafkaIOTest {
consumer.updatePartitions(topic, partitionMap.get(topic));
}
+ // MockConsumer does not maintain any relationship between partition seek position and the
+ // records added. e.g. if we add 10 records to a partition and then seek to end of the
+ // partition, MockConsumer is still going to return the 10 records in next poll. It is
+ // our responsibility to make sure currently enqueued records sync with partition offsets.
+ // The following task will be called inside each invocation to MockConsumer.poll().
+ // We enqueue only the records with the offset >= partition's current position.
+ Runnable recordEnquerTask = new Runnable() {
+ @Override
+ public void run() {
+ // add all the records with offset >= current partition position.
+ for (TopicPartition tp : assignedPartitions.get()) {
+ long curPos = consumer.position(tp);
+ for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
+ if (r.offset() >= curPos) {
+ consumer.addRecord(r);
+ }
+ }
+ }
+ consumer.schedulePollTask(this);
+ }
+ };
+
+ consumer.schedulePollTask(recordEnquerTask);
return consumer;
}
@@ -158,15 +186,20 @@ public class KafkaIOTest {
private final List<String> topics;
private final int partitionsPerTopic;
private final int numElements;
+ private final OffsetResetStrategy offsetResetStrategy;
- public ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int numElements) {
+ public ConsumerFactoryFn(List<String> topics,
+ int partitionsPerTopic,
+ int numElements,
+ OffsetResetStrategy offsetResetStrategy) {
this.topics = topics;
this.partitionsPerTopic = partitionsPerTopic;
this.numElements = numElements;
+ this.offsetResetStrategy = offsetResetStrategy;
}
public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
- return mkMockConsumer(topics, partitionsPerTopic, numElements);
+ return mkMockConsumer(topics, partitionsPerTopic, numElements, offsetResetStrategy);
}
}
@@ -183,7 +216,8 @@ public class KafkaIOTest {
KafkaIO.Read<Integer, Long> reader = KafkaIO.read()
.withBootstrapServers("none")
.withTopics(topics)
- .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions
+ .withConsumerFactoryFn(new ConsumerFactoryFn(
+ topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
.withKeyCoder(BigEndianIntegerCoder.of())
.withValueCoder(BigEndianLongCoder.of())
.withMaxNumRecords(numElements);
@@ -257,7 +291,8 @@ public class KafkaIOTest {
KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read()
.withBootstrapServers("none")
.withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
- .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 10 partitions
+ .withConsumerFactoryFn(new ConsumerFactoryFn(
+ topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 partitions
.withValueCoder(BigEndianLongCoder.of())
.withMaxNumRecords(numElements / 10);
@@ -357,7 +392,14 @@ public class KafkaIOTest {
private static void advanceOnce(UnboundedReader<?> reader) throws IOException {
while (!reader.advance()) {
// very rarely will there be more than one attempts.
- // in case of a bug we might end up looping forever, and test will fail with a timeout.
+ // In case of a bug we might end up looping forever, and test will fail with a timeout.
+
+ // Avoid hard cpu spinning in case of a test failure.
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
}
@@ -373,14 +415,11 @@ public class KafkaIOTest {
.get(0);
UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null, null);
- final int numToSkip = 3;
+ final int numToSkip = 20; // one from each partition.
// advance numToSkip elements
- if (!reader.start()) {
- advanceOnce(reader);
- }
-
- for (long l = 0; l < numToSkip - 1; ++l) {
+ reader.start();
+ for (int l = 1; l < numToSkip; ++l) {
advanceOnce(reader);
}
@@ -396,10 +435,7 @@ public class KafkaIOTest {
// Confirm that we get the next elements in sequence.
// This also confirms that Reader interleaves records from each partitions by the reader.
- if (!reader.start()) {
- advanceOnce(reader);
- }
-
+ reader.start();
for (int i = numToSkip; i < numElements; i++) {
assertEquals(i, (long) reader.getCurrent().getKV().getValue());
assertEquals(i, reader.getCurrentTimestamp().getMillis());
@@ -410,6 +446,68 @@ public class KafkaIOTest {
}
@Test
+ public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws Exception {
+ // Similar to testUnboundedSourceCheckpointMark(), but verifies that source resumes
+ // properly from empty partitions, without missing messages added since checkpoint.
+
+ // Initialize consumer with fewer elements than number of partitions so that some are empty.
+ int initialNumElements = 5;
+ UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source =
+ mkKafkaReadTransform(initialNumElements, new ValueAsTimestampFn())
+ .makeSource()
+ .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create())
+ .get(0);
+
+ UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null, null);
+
+ reader.start();
+ for (int l = 1; l < initialNumElements; ++l) {
+ advanceOnce(reader);
+ }
+
+ // Checkpoint and restart, and confirm that the source continues correctly.
+ KafkaCheckpointMark mark = CoderUtils.clone(
+ source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark());
+
+ // Create another source with MockConsumer with OffsetResetStrategy.LATEST. This insures that
+ // the reader need to explicitly need to seek to first offset for partitions that were empty.
+
+ int numElements = 100; // all the 20 partitions will have elements
+ List<String> topics = ImmutableList.of("topic_a", "topic_b");
+
+ source = KafkaIO.read()
+ .withBootstrapServers("none")
+ .withTopics(topics)
+ .withConsumerFactoryFn(new ConsumerFactoryFn(
+ topics, 10, numElements, OffsetResetStrategy.LATEST))
+ .withKeyCoder(BigEndianIntegerCoder.of())
+ .withValueCoder(BigEndianLongCoder.of())
+ .withMaxNumRecords(numElements)
+ .withTimestampFn(new ValueAsTimestampFn())
+ .makeSource()
+ .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create())
+ .get(0);
+
+ reader = source.createReader(null, mark);
+
+ reader.start();
+
+ // Verify in any order. As the partitions are unevenly read, the returned records are not in a
+ // simple order. Note that testUnboundedSourceCheckpointMark() verifies round-robin oder.
+
+ List<Long> expected = new ArrayList<>();
+ List<Long> actual = new ArrayList<>();
+ for (long i = initialNumElements; i < numElements; i++) {
+ expected.add(i);
+ actual.add(reader.getCurrent().getKV().getValue());
+ if ((i + 1) < numElements) {
+ advanceOnce(reader);
+ }
+ }
+ assertThat(actual, IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray()));
+ }
+
+ @Test
public void testSink() throws Exception {
// Simply read from kafka source and write to kafka sink. Then verify the records
// are correctly published to mock kafka producer.
@@ -440,7 +538,7 @@ public class KafkaIOTest {
completionThread.shutdown();
verifyProducerRecords(topic, numElements, false);
- }
+ }
}
@Test
[2/2] incubator-beam git commit: Closes #1071
Posted by dh...@apache.org.
Closes #1071
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/93d2e374
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/93d2e374
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/93d2e374
Branch: refs/heads/master
Commit: 93d2e374c31162bb28c474c3d454e6671b02a159
Parents: 3c73170 08c2f1c
Author: Dan Halperin <dh...@google.com>
Authored: Thu Oct 13 17:22:29 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Oct 13 17:22:29 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/kafka/KafkaCheckpointMark.java | 8 +-
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 60 +++----
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 162 +++++++++++++++----
3 files changed, 165 insertions(+), 65 deletions(-)
----------------------------------------------------------------------