You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/10/29 15:53:50 UTC
[metron] 03/04: METRON-2222 Remove Overrides for Storm 1.0.x
(nickwallen) closes apache/metron#1545
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch METRON-2223
in repository https://gitbox.apache.org/repos/asf/metron.git
commit 522de8410df14c314ea3b6597a9d6e9c2f0e8353
Author: nickwallen <ni...@apache.org>
AuthorDate: Tue Oct 29 11:07:30 2019 -0400
METRON-2222 Remove Overrides for Storm 1.0.x (nickwallen) closes apache/metron#1545
---
.../metron-storm-kafka-override/pom.xml | 165 ----------
.../spout/KafkaSpoutRetryExponentialBackoff.java | 336 ---------------------
.../apache/storm/kafka/spout/internal/Timer.java | 65 ----
.../storm/kafka/spout/internal/TimerTest.java | 36 ---
metron-platform/metron-storm-kafka/pom.xml | 87 +++++-
metron-platform/pom.xml | 1 -
6 files changed, 82 insertions(+), 608 deletions(-)
diff --git a/metron-platform/metron-storm-kafka-override/pom.xml b/metron-platform/metron-storm-kafka-override/pom.xml
deleted file mode 100644
index fe086f2..0000000
--- a/metron-platform/metron-storm-kafka-override/pom.xml
+++ /dev/null
@@ -1,165 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software
- Foundation (ASF) under one or more contributor license agreements. See the
- NOTICE file distributed with this work for additional information regarding
- copyright ownership. The ASF licenses this file to You under the Apache License,
- Version 2.0 (the "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software distributed
- under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
- OR CONDITIONS OF ANY KIND, either express or implied. See the License for
- the specific language governing permissions and limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.metron</groupId>
- <artifactId>metron-platform</artifactId>
- <version>0.7.2</version>
- </parent>
- <artifactId>metron-storm-kafka-override</artifactId>
- <name>metron-storm-kafka-override</name>
- <description>Components that extend the Storm/Kafka spout</description>
- <url>https://metron.apache.org/</url>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <commons.config.version>1.10</commons.config.version>
- <guava_version>16.0.1</guava_version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava_version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka-client</artifactId>
- <version>${global_storm_kafka_version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${global_kafka_version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${global_storm_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-slf4j-impl</artifactId>
- <groupId>org.apache.logging.log4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>${global_shade_version}</version>
- <configuration>
- <shadedArtifactAttached>true</shadedArtifactAttached>
- <shadedClassifierName>uber</shadedClassifierName>
- <createDependencyReducedPom>true</createDependencyReducedPom>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- <relocations>
- <relocation>
- <pattern>com.google.common</pattern>
- <shadedPattern>org.apache.metron.guava.${guava_version}</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.google.thirdparty</pattern>
- <shadedPattern>org.apache.metron.storm.kafka.override.guava.thirdparty</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.commons.lang</pattern>
- <shadedPattern>org.apache.metron.storm.kafka.override.commons.lang</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.fasterxml</pattern>
- <shadedPattern>org.apache.metron.storm.kafka.override.com.fasterxml</shadedPattern>
- </relocation>
- </relocations>
- <artifactSet>
- <excludes>
- <exclude>storm:storm-core:*</exclude>
- <exclude>storm:storm-lib:*</exclude>
- <exclude>org.slf4j.impl*</exclude>
- <exclude>org.slf4j:slf4j-log4j*</exclude>
- </excludes>
- </artifactSet>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
- <resources>
- <resource>.yaml</resource>
- <resource>LICENSE.txt</resource>
- <resource>ASL2.0</resource>
- <resource>NOTICE.txt</resource>
- </resources>
- </transformer>
- <!-- UNCOMMENT THIS IF YOU NEED TO REGENERATE THE BEST GUESS NOTICES FILE WHICH REQUIRES PRUNING EVERY RELEASE -->
- <!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
- <addHeader>false</addHeader>
- <projectName>${project.name}</projectName>
- </transformer-->
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass></mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
-</project>
diff --git a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
deleted file mode 100644
index 439188b..0000000
--- a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.Validate;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-/*
- This file is pulled from Apache Storm, with some modification to support lower version of
- Apache Storm.
-
- - Time.nanoTime() is introduced in Storm 1.1.0 so we changed to System.nanoTime()
- -- Time.nanoTime() calls System.nanoTime() when it's not in time simulation mode.
-*/
-
-/**
- * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
- * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1) where failCount = 1, 2, 3, ...
- * nextRetry = Min(nextRetry, currentTime + maxDelay)
- */
-public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
- private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
-
- private final TimeInterval initialDelay;
- private final TimeInterval delayPeriod;
- private final TimeInterval maxDelay;
- private final int maxRetries;
-
- //This class assumes that there is at most one retry schedule per message id in this set at a time.
- private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
- private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
-
- /**
- * Comparator ordering by timestamp
- */
- private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
- @Override
- public int compare(RetrySchedule entry1, RetrySchedule entry2) {
- int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
-
- if(result == 0) {
- //TreeSet uses compareTo instead of equals() for the Set contract
- //Ensure that we can save two retry schedules with the same timestamp
- result = entry1.hashCode() - entry2.hashCode();
- }
- return result;
- }
- }
-
- private class RetrySchedule {
- private final KafkaSpoutMessageId msgId;
- private long nextRetryTimeNanos;
-
- public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTimeNanos) {
- this.msgId = msgId;
- this.nextRetryTimeNanos = nextRetryTimeNanos;
- LOG.debug("Created {}", this);
- }
-
- public void setNextRetryTimeNanos() {
- nextRetryTimeNanos = nextTime(msgId);
- LOG.debug("Updated {}", this);
- }
-
- public boolean retry(long currentTimeNanos) {
- return nextRetryTimeNanos <= currentTimeNanos;
- }
-
- @Override
- public String toString() {
- return "RetrySchedule{" +
- "msgId=" + msgId +
- ", nextRetryTimeNanos=" + nextRetryTimeNanos +
- '}';
- }
-
- public KafkaSpoutMessageId msgId() {
- return msgId;
- }
-
- public long nextRetryTimeNanos() {
- return nextRetryTimeNanos;
- }
- }
-
- public static class TimeInterval implements Serializable {
- private final long lengthNanos;
- private final TimeUnit timeUnit;
- private final long length;
-
- /**
- * @param length length of the time interval in the units specified by {@link TimeUnit}
- * @param timeUnit unit used to specify a time interval on which to specify a time unit
- */
- public TimeInterval(long length, TimeUnit timeUnit) {
- this.lengthNanos = timeUnit.toNanos(length);
- this.timeUnit = timeUnit;
- this.length = length;
- }
-
- public static TimeInterval seconds(long length) {
- return new TimeInterval(length, TimeUnit.SECONDS);
- }
-
- public static TimeInterval milliSeconds(long length) {
- return new TimeInterval(length, TimeUnit.MILLISECONDS);
- }
-
- public static TimeInterval microSeconds(long length) {
- return new TimeInterval(length, TimeUnit.MICROSECONDS);
- }
-
- public long lengthNanos() {
- return lengthNanos;
- }
-
- public TimeUnit timeUnit() {
- return timeUnit;
- }
-
- @Override
- public String toString() {
- return "TimeInterval{" +
- "length=" + length +
- ", timeUnit=" + timeUnit +
- '}';
- }
- }
-
- /**
- * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
- * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
- * nextRetry = Min(nextRetry, currentTime + maxDelay).
- *
- * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the previous
- * polled records in favor of processing more records.
- *
- * @param initialDelay initial delay of the first retry
- * @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
- * @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit
- * @param maxDelay maximum amount of time waiting before retrying
- *
- */
- public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
- this.initialDelay = initialDelay;
- this.delayPeriod = delayPeriod;
- this.maxRetries = maxRetries;
- this.maxDelay = maxDelay;
- LOG.debug("Instantiated {}", this.toStringImpl());
- }
-
- @Override
- public Map<TopicPartition, Long> earliestRetriableOffsets() {
- final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
- final long currentTimeNanos = System.nanoTime();
- for (RetrySchedule retrySchedule : retrySchedules) {
- if (retrySchedule.retry(currentTimeNanos)) {
- final KafkaSpoutMessageId msgId = retrySchedule.msgId;
- final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
- final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage);
- if(currentLowestOffset != null) {
- tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset()));
- } else {
- tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset());
- }
- } else {
- break; // Stop searching as soon as passed current time
- }
- }
- LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
- return tpToEarliestRetriableOffset;
- }
-
- @Override
- public boolean isReady(KafkaSpoutMessageId msgId) {
- boolean retry = false;
- if (isScheduled(msgId)) {
- final long currentTimeNanos = System.nanoTime();
- for (RetrySchedule retrySchedule : retrySchedules) {
- if (retrySchedule.retry(currentTimeNanos)) {
- if (retrySchedule.msgId.equals(msgId)) {
- retry = true;
- LOG.debug("Found entry to retry {}", retrySchedule);
- break; //Stop searching if the message is known to be ready for retry
- }
- } else {
- LOG.debug("Entry to retry not found {}", retrySchedule);
- break; // Stop searching as soon as passed current time
- }
- }
- }
- return retry;
- }
-
- @Override
- public boolean isScheduled(KafkaSpoutMessageId msgId) {
- return toRetryMsgs.contains(msgId);
- }
-
- @Override
- public boolean remove(KafkaSpoutMessageId msgId) {
- boolean removed = false;
- if (isScheduled(msgId)) {
- toRetryMsgs.remove(msgId);
- for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
- final RetrySchedule retrySchedule = iterator.next();
- if (retrySchedule.msgId().equals(msgId)) {
- iterator.remove();
- removed = true;
- break; //There is at most one schedule per message id
- }
- }
- }
- LOG.debug(removed ? "Removed {} " : "Not removed {}", msgId);
- LOG.trace("Current state {}", retrySchedules);
- return removed;
- }
-
- @Override
- public boolean retainAll(Collection<TopicPartition> topicPartitions) {
- boolean result = false;
- for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) {
- final RetrySchedule retrySchedule = rsIterator.next();
- final KafkaSpoutMessageId msgId = retrySchedule.msgId;
- final TopicPartition tpRetry= new TopicPartition(msgId.topic(), msgId.partition());
- if (!topicPartitions.contains(tpRetry)) {
- rsIterator.remove();
- toRetryMsgs.remove(msgId);
- LOG.debug("Removed {}", retrySchedule);
- LOG.trace("Current state {}", retrySchedules);
- result = true;
- }
- }
- return result;
- }
-
- @Override
- public boolean schedule(KafkaSpoutMessageId msgId) {
- if (msgId.numFails() > maxRetries) {
- LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
- return false;
- } else {
- //Remove existing schedule for the message id
- remove(msgId);
- final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
- retrySchedules.add(retrySchedule);
- toRetryMsgs.add(msgId);
- LOG.debug("Scheduled. {}", retrySchedule);
- LOG.trace("Current state {}", retrySchedules);
- return true;
- }
- }
-
- @Override
- public int readyMessageCount() {
- int count = 0;
- final long currentTimeNanos = System.nanoTime();
- for (RetrySchedule retrySchedule : retrySchedules) {
- if (retrySchedule.retry(currentTimeNanos)) {
- ++count;
- } else {
- break; //Stop counting when past current time
- }
- }
- return count;
- }
-
- @Override
- public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
- KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
- if (isScheduled(msgId)) {
- for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
- if (originalMsgId.equals(msgId)) {
- return originalMsgId;
- }
- }
- }
- return msgId;
- }
-
- // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
- private long nextTime(KafkaSpoutMessageId msgId) {
- Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message has failed at least once");
- final long currentTimeNanos = System.nanoTime();
- final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ...
- ? currentTimeNanos + initialDelay.lengthNanos
- : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails()-1));
- return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
- }
-
- @Override
- public String toString() {
- return toStringImpl();
- }
-
- private String toStringImpl() {
- //This is here to avoid an overridable call in the constructor
- return "KafkaSpoutRetryExponentialBackoff{" +
- "delay=" + initialDelay +
- ", ratio=" + delayPeriod +
- ", maxRetries=" + maxRetries +
- ", maxRetryDelay=" + maxDelay +
- '}';
- }
-}
diff --git a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
deleted file mode 100644
index 0b045c0..0000000
--- a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout.internal;
-
-import java.util.concurrent.TimeUnit;
-
-/*
- This file is pulled from Apache Storm, with some modification to support lower version of
- Apache Storm.
-
- - Time.nanoTime() is introduced in Storm 1.1.0 so we changed to System.nanoTime()
- -- Time.nanoTime() calls System.nanoTime() when it's not in time simulation mode.
-*/
-
-public class Timer {
- private final long delay;
- private final long period;
- private final TimeUnit timeUnit;
- private final long periodNanos;
- private long start;
-
- public Timer(long delay, long period, TimeUnit timeUnit) {
- this.delay = delay;
- this.period = period;
- this.timeUnit = timeUnit;
- this.periodNanos = timeUnit.toNanos(period);
- this.start = System.nanoTime() + timeUnit.toNanos(delay);
- }
-
- public long period() {
- return this.period;
- }
-
- public long delay() {
- return this.delay;
- }
-
- public TimeUnit getTimeUnit() {
- return this.timeUnit;
- }
-
- public boolean isExpiredResetOnTrue() {
- boolean expired = System.nanoTime() - this.start >= this.periodNanos;
- if(expired) {
- this.start = System.nanoTime();
- }
-
- return expired;
- }
-}
diff --git a/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java b/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java
deleted file mode 100644
index 0d49ae1..0000000
--- a/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout.internal;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-public class TimerTest {
-
- @Test
- public void testReset() throws InterruptedException {
- Timer t = new Timer(0, 2, TimeUnit.SECONDS);
- Thread.sleep(1000);
- Assert.assertFalse(t.isExpiredResetOnTrue());
- Thread.sleep(1000);
- Assert.assertTrue(t.isExpiredResetOnTrue());
- }
-
-}
diff --git a/metron-platform/metron-storm-kafka/pom.xml b/metron-platform/metron-storm-kafka/pom.xml
index 17d3abc..5b601d8 100644
--- a/metron-platform/metron-storm-kafka/pom.xml
+++ b/metron-platform/metron-storm-kafka/pom.xml
@@ -28,12 +28,19 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<commons.config.version>1.10</commons.config.version>
+ <!-- the version of Guava used by storm-kafka-clientis < ${global_guava_version}-->
+ <guava_version>16.0.1</guava_version>
</properties>
<dependencies>
<dependency>
- <groupId>org.apache.metron</groupId>
- <artifactId>metron-storm-kafka-override</artifactId>
- <version>${project.parent.version}</version>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka-client</artifactId>
+ <version>${global_storm_kafka_version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
@@ -77,11 +84,81 @@
<groupId>org.apache.metron</groupId>
<artifactId>stellar-common</artifactId>
<version>${project.parent.version}</version>
- <scope>provided</scope>
</dependency>
</dependencies>
-
<build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${global_shade_version}</version>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>uber</shadedClassifierName>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.metron.guava.${guava_version}</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.thirdparty</pattern>
+ <shadedPattern>org.apache.metron.storm.kafka.override.guava.thirdparty</shadedPattern>
+ </relocation>
+ </relocations>
+ <artifactSet>
+ <excludes>
+ <exclude>storm:storm-core:*</exclude>
+ <exclude>storm:storm-lib:*</exclude>
+ <exclude>org.slf4j.impl*</exclude>
+ <exclude>org.slf4j:slf4j-log4j*</exclude>
+ </excludes>
+ </artifactSet>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resources>
+ <resource>.yaml</resource>
+ <resource>LICENSE.txt</resource>
+ <resource>ASL2.0</resource>
+ <resource>NOTICE.txt</resource>
+ </resources>
+ </transformer>
+ <!-- UNCOMMENT THIS IF YOU NEED TO REGENERATE THE BEST GUESS NOTICES FILE WHICH REQUIRES PRUNING EVERY RELEASE -->
+ <!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+ <addHeader>false</addHeader>
+ <projectName>${project.name}</projectName>
+ </transformer-->
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass></mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index 1f028b5..562204c 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -58,7 +58,6 @@
<module>elasticsearch-shaded</module>
<module>metron-elasticsearch</module>
<module>metron-storm-kafka</module>
- <module>metron-storm-kafka-override</module>
<module>metron-zookeeper</module>
<module>metron-parsing</module>
<module>metron-hbase-server</module>