You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/10/29 15:53:50 UTC

[metron] 03/04: METRON-2222 Remove Overrides for Storm 1.0.x (nickwallen) closes apache/metron#1545

This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch METRON-2223
in repository https://gitbox.apache.org/repos/asf/metron.git

commit 522de8410df14c314ea3b6597a9d6e9c2f0e8353
Author: nickwallen <ni...@apache.org>
AuthorDate: Tue Oct 29 11:07:30 2019 -0400

    METRON-2222 Remove Overrides for Storm 1.0.x (nickwallen) closes apache/metron#1545
---
 .../metron-storm-kafka-override/pom.xml            | 165 ----------
 .../spout/KafkaSpoutRetryExponentialBackoff.java   | 336 ---------------------
 .../apache/storm/kafka/spout/internal/Timer.java   |  65 ----
 .../storm/kafka/spout/internal/TimerTest.java      |  36 ---
 metron-platform/metron-storm-kafka/pom.xml         |  87 +++++-
 metron-platform/pom.xml                            |   1 -
 6 files changed, 82 insertions(+), 608 deletions(-)

diff --git a/metron-platform/metron-storm-kafka-override/pom.xml b/metron-platform/metron-storm-kafka-override/pom.xml
deleted file mode 100644
index fe086f2..0000000
--- a/metron-platform/metron-storm-kafka-override/pom.xml
+++ /dev/null
@@ -1,165 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software
-	Foundation (ASF) under one or more contributor license agreements. See the
-	NOTICE file distributed with this work for additional information regarding
-	copyright ownership. The ASF licenses this file to You under the Apache License,
-	Version 2.0 (the "License"); you may not use this file except in compliance
-	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-	Unless required by applicable law or agreed to in writing, software distributed
-	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
-	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
-  the specific language governing permissions and limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.metron</groupId>
-        <artifactId>metron-platform</artifactId>
-        <version>0.7.2</version>
-    </parent>
-    <artifactId>metron-storm-kafka-override</artifactId>
-    <name>metron-storm-kafka-override</name>
-    <description>Components that extend the Storm/Kafka spout</description>
-    <url>https://metron.apache.org/</url>
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-        <commons.config.version>1.10</commons.config.version>
-        <guava_version>16.0.1</guava_version>
-    </properties>
-    <dependencies>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${guava_version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka-client</artifactId>
-            <version>${global_storm_kafka_version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>${global_kafka_version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${global_storm_version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>servlet-api</artifactId>
-                    <groupId>javax.servlet</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>log4j-over-slf4j</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>log4j-slf4j-impl</artifactId>
-                    <groupId>org.apache.logging.log4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-    </dependencies>
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <version>${global_shade_version}</version>
-                <configuration>
-                    <shadedArtifactAttached>true</shadedArtifactAttached>
-                    <shadedClassifierName>uber</shadedClassifierName>
-                    <createDependencyReducedPom>true</createDependencyReducedPom>
-                </configuration>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                          <filters>
-                            <filter>
-                              <artifact>*:*</artifact>
-                              <excludes>
-                                <exclude>META-INF/*.SF</exclude>
-                                <exclude>META-INF/*.DSA</exclude>
-                                <exclude>META-INF/*.RSA</exclude>
-                              </excludes>
-                            </filter>
-                          </filters>
-                          <relocations>
-                                <relocation>
-                                    <pattern>com.google.common</pattern>
-                                    <shadedPattern>org.apache.metron.guava.${guava_version}</shadedPattern>
-                                </relocation>
-                                <relocation>
-                                    <pattern>com.google.thirdparty</pattern>
-                                    <shadedPattern>org.apache.metron.storm.kafka.override.guava.thirdparty</shadedPattern>
-                                </relocation>
-                                <relocation>
-                                    <pattern>org.apache.commons.lang</pattern>
-                                    <shadedPattern>org.apache.metron.storm.kafka.override.commons.lang</shadedPattern>
-                                </relocation>
-                                <relocation>
-                                    <pattern>com.fasterxml</pattern>
-                                    <shadedPattern>org.apache.metron.storm.kafka.override.com.fasterxml</shadedPattern>
-                                </relocation>
-                            </relocations>
-                            <artifactSet>
-                                <excludes>
-                                    <exclude>storm:storm-core:*</exclude>
-                                    <exclude>storm:storm-lib:*</exclude>
-                                    <exclude>org.slf4j.impl*</exclude>
-                                    <exclude>org.slf4j:slf4j-log4j*</exclude>
-                                </excludes>
-                            </artifactSet>
-                            <transformers>
-                                <transformer
-                                  implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
-                                     <resources>
-                                        <resource>.yaml</resource>
-                                        <resource>LICENSE.txt</resource>
-                                        <resource>ASL2.0</resource>
-                                        <resource>NOTICE.txt</resource>
-                                      </resources>
-                                </transformer>
-                                <!-- UNCOMMENT THIS IF YOU NEED TO REGENERATE THE BEST GUESS NOTICES FILE WHICH REQUIRES PRUNING EVERY RELEASE -->
-                                <!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
-                                    <addHeader>false</addHeader>
-                                    <projectName>${project.name}</projectName>
-                                </transformer-->
-                                <transformer
-                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-                                <transformer
-                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass></mainClass>
-                                </transformer>
-                            </transformers>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-            </resource>
-        </resources>
-    </build>
-</project>
diff --git a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
deleted file mode 100644
index 439188b..0000000
--- a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.Validate;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-/*
- This file is pulled from Apache Storm, with some modification to support lower version of
- Apache Storm.
-
- - Time.nanoTime() is introduced in Storm 1.1.0 so we changed to System.nanoTime()
- -- Time.nanoTime() calls System.nanoTime() when it's not in time simulation mode.
-*/
-
-/**
- * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
- * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1)    where failCount = 1, 2, 3, ...
- * nextRetry = Min(nextRetry, currentTime + maxDelay)
- */
-public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
-    private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
-
-    private final TimeInterval initialDelay;
-    private final TimeInterval delayPeriod;
-    private final TimeInterval maxDelay;
-    private final int maxRetries;
-
-    //This class assumes that there is at most one retry schedule per message id in this set at a time.
-    private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
-    private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();      // Convenience data structure to speedup lookups
-
-    /**
-     * Comparator ordering by timestamp 
-     */
-    private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
-        @Override
-        public int compare(RetrySchedule entry1, RetrySchedule entry2) {
-            int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
-            
-            if(result == 0) {
-                //TreeSet uses compareTo instead of equals() for the Set contract
-                //Ensure that we can save two retry schedules with the same timestamp
-                result = entry1.hashCode() - entry2.hashCode();
-            }
-            return result;
-        }
-    }
-
-    private class RetrySchedule {
-        private final KafkaSpoutMessageId msgId;
-        private long nextRetryTimeNanos;
-
-        public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTimeNanos) {
-            this.msgId = msgId;
-            this.nextRetryTimeNanos = nextRetryTimeNanos;
-            LOG.debug("Created {}", this);
-        }
-
-        public void setNextRetryTimeNanos() {
-            nextRetryTimeNanos = nextTime(msgId);
-            LOG.debug("Updated {}", this);
-        }
-
-        public boolean retry(long currentTimeNanos) {
-            return nextRetryTimeNanos <= currentTimeNanos;
-        }
-
-        @Override
-        public String toString() {
-            return "RetrySchedule{" +
-                    "msgId=" + msgId +
-                    ", nextRetryTimeNanos=" + nextRetryTimeNanos +
-                    '}';
-        }
-
-        public KafkaSpoutMessageId msgId() {
-            return msgId;
-        }
-
-        public long nextRetryTimeNanos() {
-            return nextRetryTimeNanos;
-        }
-    }
-
-    public static class TimeInterval implements Serializable {
-        private final long lengthNanos;
-        private final TimeUnit timeUnit;
-        private final long length;
-
-        /**
-         * @param length length of the time interval in the units specified by {@link TimeUnit}
-         * @param timeUnit unit used to specify a time interval on which to specify a time unit
-         */
-        public TimeInterval(long length, TimeUnit timeUnit) {
-            this.lengthNanos = timeUnit.toNanos(length);
-            this.timeUnit = timeUnit;
-            this.length = length;
-        }
-        
-        public static TimeInterval seconds(long length) {
-            return new TimeInterval(length, TimeUnit.SECONDS);
-        }
-
-        public static TimeInterval milliSeconds(long length) {
-            return new TimeInterval(length, TimeUnit.MILLISECONDS);
-        }
-        
-        public static TimeInterval microSeconds(long length) {
-            return new TimeInterval(length, TimeUnit.MICROSECONDS);
-        }
-        
-        public long lengthNanos() {
-            return lengthNanos;
-        }
-        
-        public TimeUnit timeUnit() {
-            return timeUnit;
-        }
-
-        @Override
-        public String toString() {
-            return "TimeInterval{" +
-                    "length=" + length +
-                    ", timeUnit=" + timeUnit +
-                    '}';
-        }
-    }
-
-    /**
-     * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
-     * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
-     * nextRetry = Min(nextRetry, currentTime + maxDelay).
-     * 
-     * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the previous
-     * polled records in favor of processing more records.
-     *
-     * @param initialDelay      initial delay of the first retry
-     * @param delayPeriod       the time interval that is the ratio of the exponential backoff formula (geometric progression)
-     * @param maxRetries        maximum number of times a tuple is retried before being acked and scheduled for commit
-     * @param maxDelay          maximum amount of time waiting before retrying
-     *
-     */
-    public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
-        this.initialDelay = initialDelay;
-        this.delayPeriod = delayPeriod;
-        this.maxRetries = maxRetries;
-        this.maxDelay = maxDelay;
-        LOG.debug("Instantiated {}", this.toStringImpl());
-    }
-
-    @Override
-    public Map<TopicPartition, Long> earliestRetriableOffsets() {
-        final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
-        final long currentTimeNanos = System.nanoTime();
-        for (RetrySchedule retrySchedule : retrySchedules) {
-            if (retrySchedule.retry(currentTimeNanos)) {
-                final KafkaSpoutMessageId msgId = retrySchedule.msgId;
-                final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
-                final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage);
-                if(currentLowestOffset != null) {
-                    tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset()));
-                } else {
-                    tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset());
-                }
-            } else {
-                break;  // Stop searching as soon as passed current time
-            }
-        }
-        LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
-        return tpToEarliestRetriableOffset;
-    }
-
-    @Override
-    public boolean isReady(KafkaSpoutMessageId msgId) {
-        boolean retry = false;
-        if (isScheduled(msgId)) {
-            final long currentTimeNanos = System.nanoTime();
-            for (RetrySchedule retrySchedule : retrySchedules) {
-                if (retrySchedule.retry(currentTimeNanos)) {
-                    if (retrySchedule.msgId.equals(msgId)) {
-                        retry = true;
-                        LOG.debug("Found entry to retry {}", retrySchedule);
-                        break; //Stop searching if the message is known to be ready for retry
-                    }
-                } else {
-                    LOG.debug("Entry to retry not found {}", retrySchedule);
-                    break;  // Stop searching as soon as passed current time
-                }
-            }
-        }
-        return retry;
-    }
-
-    @Override
-    public boolean isScheduled(KafkaSpoutMessageId msgId) {
-        return toRetryMsgs.contains(msgId);
-    }
-
-    @Override
-    public boolean remove(KafkaSpoutMessageId msgId) {
-        boolean removed = false;
-        if (isScheduled(msgId)) {
-            toRetryMsgs.remove(msgId);
-            for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
-                final RetrySchedule retrySchedule = iterator.next();
-                if (retrySchedule.msgId().equals(msgId)) {
-                    iterator.remove();
-                    removed = true;
-                    break; //There is at most one schedule per message id
-                }
-            }
-        }
-        LOG.debug(removed ? "Removed {} " : "Not removed {}", msgId);
-        LOG.trace("Current state {}", retrySchedules);
-        return removed;
-    }
-
-    @Override
-    public boolean retainAll(Collection<TopicPartition> topicPartitions) {
-        boolean result = false;
-        for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) {
-            final RetrySchedule retrySchedule = rsIterator.next();
-            final KafkaSpoutMessageId msgId = retrySchedule.msgId;
-            final TopicPartition tpRetry= new TopicPartition(msgId.topic(), msgId.partition());
-            if (!topicPartitions.contains(tpRetry)) {
-                rsIterator.remove();
-                toRetryMsgs.remove(msgId);
-                LOG.debug("Removed {}", retrySchedule);
-                LOG.trace("Current state {}", retrySchedules);
-                result = true;
-            }
-        }
-        return result;
-    }
-
-    @Override
-    public boolean schedule(KafkaSpoutMessageId msgId) {
-        if (msgId.numFails() > maxRetries) {
-            LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
-            return false;
-        } else {
-            //Remove existing schedule for the message id
-            remove(msgId);
-            final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
-            retrySchedules.add(retrySchedule);
-            toRetryMsgs.add(msgId);
-            LOG.debug("Scheduled. {}", retrySchedule);
-            LOG.trace("Current state {}", retrySchedules);
-            return true;
-        }
-    }
-    
-    @Override
-    public int readyMessageCount() {
-        int count = 0;
-        final long currentTimeNanos = System.nanoTime();
-        for (RetrySchedule retrySchedule : retrySchedules) {
-            if (retrySchedule.retry(currentTimeNanos)) {
-                ++count;
-            } else {
-                break; //Stop counting when past current time
-            }
-        }
-        return count;
-    }
-
-    @Override
-    public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
-        KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
-        if (isScheduled(msgId)) {
-            for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
-                if (originalMsgId.equals(msgId)) {
-                    return originalMsgId;
-                }
-            }
-        }
-        return msgId;
-    }
-
-    // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
-    private long nextTime(KafkaSpoutMessageId msgId) {
-        Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message has failed at least once");
-        final long currentTimeNanos = System.nanoTime();
-        final long nextTimeNanos = msgId.numFails() == 1                // numFails = 1, 2, 3, ...
-                ? currentTimeNanos + initialDelay.lengthNanos
-                : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails()-1));
-        return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
-    }
-
-    @Override
-    public String toString() {
-        return toStringImpl();
-    }
-    
-    private String toStringImpl() {
-        //This is here to avoid an overridable call in the constructor
-        return "KafkaSpoutRetryExponentialBackoff{" +
-                "delay=" + initialDelay +
-                ", ratio=" + delayPeriod +
-                ", maxRetries=" + maxRetries +
-                ", maxRetryDelay=" + maxDelay +
-                '}';
-    }
-}
diff --git a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
deleted file mode 100644
index 0b045c0..0000000
--- a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout.internal;
-
-import java.util.concurrent.TimeUnit;
-
-/*
- This file is pulled from Apache Storm, with some modification to support lower version of
- Apache Storm.
-
- - Time.nanoTime() is introduced in Storm 1.1.0 so we changed to System.nanoTime()
- -- Time.nanoTime() calls System.nanoTime() when it's not in time simulation mode.
-*/
-
-public class Timer {
-  private final long delay;
-  private final long period;
-  private final TimeUnit timeUnit;
-  private final long periodNanos;
-  private long start;
-
-  public Timer(long delay, long period, TimeUnit timeUnit) {
-    this.delay = delay;
-    this.period = period;
-    this.timeUnit = timeUnit;
-    this.periodNanos = timeUnit.toNanos(period);
-    this.start = System.nanoTime() + timeUnit.toNanos(delay);
-  }
-
-  public long period() {
-    return this.period;
-  }
-
-  public long delay() {
-    return this.delay;
-  }
-
-  public TimeUnit getTimeUnit() {
-    return this.timeUnit;
-  }
-
-  public boolean isExpiredResetOnTrue() {
-    boolean expired = System.nanoTime() - this.start >= this.periodNanos;
-    if(expired) {
-      this.start = System.nanoTime();
-    }
-
-    return expired;
-  }
-}
diff --git a/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java b/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java
deleted file mode 100644
index 0d49ae1..0000000
--- a/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout.internal;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-public class TimerTest {
-
-  @Test
-  public void testReset() throws InterruptedException {
-    Timer t = new Timer(0, 2, TimeUnit.SECONDS);
-    Thread.sleep(1000);
-    Assert.assertFalse(t.isExpiredResetOnTrue());
-    Thread.sleep(1000);
-    Assert.assertTrue(t.isExpiredResetOnTrue());
-  }
-
-}
diff --git a/metron-platform/metron-storm-kafka/pom.xml b/metron-platform/metron-storm-kafka/pom.xml
index 17d3abc..5b601d8 100644
--- a/metron-platform/metron-storm-kafka/pom.xml
+++ b/metron-platform/metron-storm-kafka/pom.xml
@@ -28,12 +28,19 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <commons.config.version>1.10</commons.config.version>
+        <!-- the version of Guava used by storm-kafka-clientis < ${global_guava_version}-->
+        <guava_version>16.0.1</guava_version>
     </properties>
     <dependencies>
         <dependency>
-            <groupId>org.apache.metron</groupId>
-            <artifactId>metron-storm-kafka-override</artifactId>
-            <version>${project.parent.version}</version>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka-client</artifactId>
+            <version>${global_storm_kafka_version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
@@ -77,11 +84,81 @@
             <groupId>org.apache.metron</groupId>
             <artifactId>stellar-common</artifactId>
             <version>${project.parent.version}</version>
-            <scope>provided</scope>
         </dependency>
     </dependencies>
-
     <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${global_shade_version}</version>
+                <configuration>
+                    <shadedArtifactAttached>true</shadedArtifactAttached>
+                    <shadedClassifierName>uber</shadedClassifierName>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>
+                          <relocations>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>org.apache.metron.guava.${guava_version}</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google.thirdparty</pattern>
+                                    <shadedPattern>org.apache.metron.storm.kafka.override.guava.thirdparty</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>storm:storm-core:*</exclude>
+                                    <exclude>storm:storm-lib:*</exclude>
+                                    <exclude>org.slf4j.impl*</exclude>
+                                    <exclude>org.slf4j:slf4j-log4j*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <transformers>
+                                <transformer
+                                  implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                     <resources>
+                                        <resource>.yaml</resource>
+                                        <resource>LICENSE.txt</resource>
+                                        <resource>ASL2.0</resource>
+                                        <resource>NOTICE.txt</resource>
+                                      </resources>
+                                </transformer>
+                                <!-- UNCOMMENT THIS IF YOU NEED TO REGENERATE THE BEST GUESS NOTICES FILE WHICH REQUIRES PRUNING EVERY RELEASE -->
+                                <!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+                                    <addHeader>false</addHeader>
+                                    <projectName>${project.name}</projectName>
+                                </transformer-->
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
         <resources>
             <resource>
                 <directory>src/main/resources</directory>
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index 1f028b5..562204c 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -58,7 +58,6 @@
 		<module>elasticsearch-shaded</module>
 		<module>metron-elasticsearch</module>
 		<module>metron-storm-kafka</module>
-		<module>metron-storm-kafka-override</module>
 		<module>metron-zookeeper</module>
     <module>metron-parsing</module>
     <module>metron-hbase-server</module>