You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/09/28 20:38:08 UTC

[04/11] storm git commit: initial SQE import

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandKeys.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandKeys.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandKeys.java
new file mode 100644
index 0000000..1e08781
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandKeys.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class ExpandKeys extends TransformExpression {
+    public ExpandKeys() {
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.ExpandKeys(), getOutputFields());
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "expandkeys";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandValues.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandValues.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandValues.java
new file mode 100644
index 0000000..9c3d331
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandValues.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class ExpandValues extends TransformExpression {
+    public ExpandValues() {
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.ExpandValues(), getOutputFields());
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "expandvalues";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/FormatDate.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/FormatDate.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/FormatDate.java
new file mode 100644
index 0000000..dace3f5
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/FormatDate.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class FormatDate extends TransformExpression {
+    public FormatDate() {
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "formatdate";
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.FormatDate(), getOutputFields());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetMapValue.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetMapValue.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetMapValue.java
new file mode 100644
index 0000000..2d50425
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetMapValue.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class GetMapValue extends TransformExpression{
+    public GetMapValue() {
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "getmapvalue";
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.GetMapValue(), getOutputFields());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetTime.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetTime.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetTime.java
new file mode 100644
index 0000000..747a71a
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetTime.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class GetTime extends TransformExpression {
+    public GetTime() {
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "gettime";
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.GetTime(), getOutputFields());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/Hash.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/Hash.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/Hash.java
new file mode 100644
index 0000000..f354a5f
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/Hash.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class Hash extends TransformExpression {
+    public Hash() {
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.Hash(), getOutputFields());
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "hash";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/If.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/If.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/If.java
new file mode 100644
index 0000000..fe61093
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/If.java
@@ -0,0 +1,20 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import com.jwplayer.sqe.trident.function.GetIf;
+import org.apache.storm.trident.Stream;
+
+
+public class If extends TransformExpression {
+    public If() {
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new GetIf(), getOutputFields());
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "if";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ModulusArithmeticOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ModulusArithmeticOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ModulusArithmeticOperator.java
new file mode 100644
index 0000000..86b077f
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ModulusArithmeticOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+
+public class ModulusArithmeticOperator extends ArithmeticOperator {
+    public ModulusArithmeticOperator() {
+        super(ArithmeticOperatorType.Modulus);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "%";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/MultiplicationArithmeticOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/MultiplicationArithmeticOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/MultiplicationArithmeticOperator.java
new file mode 100644
index 0000000..232cedb
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/MultiplicationArithmeticOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+
+public class MultiplicationArithmeticOperator extends ArithmeticOperator {
+    public MultiplicationArithmeticOperator() {
+        super(ArithmeticOperatorType.Multiplication);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "*";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ParseDate.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ParseDate.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ParseDate.java
new file mode 100644
index 0000000..be74931
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ParseDate.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class ParseDate extends TransformExpression {
+    public ParseDate() {
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "parsedate";
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.ParseDate(), getOutputFields());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/PlusArithmeticOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/PlusArithmeticOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/PlusArithmeticOperator.java
new file mode 100644
index 0000000..7268bea
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/PlusArithmeticOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+
+public class PlusArithmeticOperator extends ArithmeticOperator {
+    public PlusArithmeticOperator() {
+        super(ArithmeticOperatorType.Addition);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "+";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/RoundDate.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/RoundDate.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/RoundDate.java
new file mode 100644
index 0000000..5f4c02b
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/RoundDate.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class RoundDate extends TransformExpression {
+    public RoundDate() {
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "rounddate";
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.RoundDate(), getOutputFields());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/SubtractionArithmeticOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/SubtractionArithmeticOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/SubtractionArithmeticOperator.java
new file mode 100644
index 0000000..c2d0c95
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/SubtractionArithmeticOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+
+public class SubtractionArithmeticOperator extends ArithmeticOperator {
+    public SubtractionArithmeticOperator() {
+        super(ArithmeticOperatorType.Subtraction);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "-";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/TransformExpression.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/TransformExpression.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/TransformExpression.java
new file mode 100644
index 0000000..2d4bb8c
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/TransformExpression.java
@@ -0,0 +1,18 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import com.jwplayer.sqe.language.expression.FunctionExpression;
+import com.jwplayer.sqe.language.expression.FunctionType;
+import org.apache.storm.trident.Stream;
+
+
+public abstract class TransformExpression extends FunctionExpression {
+    public TransformExpression() {
+    }
+
+    @Override
+    public FunctionType getFunctionType() {
+        return FunctionType.Transform;
+    }
+
+    public abstract Stream transform(Stream stream);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/AndLogicalOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/AndLogicalOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/AndLogicalOperator.java
new file mode 100644
index 0000000..b8f82fa
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/AndLogicalOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class AndLogicalOperator extends LogicalOperator {
+    public AndLogicalOperator() {
+        super(LogicalOperatorType.And);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "and";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/CompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/CompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/CompareNumbers.java
new file mode 100644
index 0000000..d7e186b
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/CompareNumbers.java
@@ -0,0 +1,21 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import org.apache.storm.trident.Stream;
+
+
+public abstract class CompareNumbers extends PredicateExpression {
+    private NumberComparisonType comparisonType = null;
+
+    public CompareNumbers(NumberComparisonType comparisonType) {
+        super();
+        this.comparisonType = comparisonType;
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        com.jwplayer.sqe.trident.function.CompareNumbers function =
+                new com.jwplayer.sqe.trident.function.CompareNumbers(comparisonType);
+
+        return stream.each(getInputFields(), function, getOutputFields());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/Equals.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/Equals.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/Equals.java
new file mode 100644
index 0000000..7b6c123
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/Equals.java
@@ -0,0 +1,20 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import com.jwplayer.sqe.trident.function.CompareObjects;
+import org.apache.storm.trident.Stream;
+
+
+public class Equals extends PredicateExpression {
+    public Equals() {
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new CompareObjects(), getOutputFields());
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "=";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanCompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanCompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanCompareNumbers.java
new file mode 100644
index 0000000..8431754
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanCompareNumbers.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class GreaterThanCompareNumbers extends CompareNumbers {
+    public GreaterThanCompareNumbers() {
+        super(NumberComparisonType.GreaterThan);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return ">";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanEqualCompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanEqualCompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanEqualCompareNumbers.java
new file mode 100644
index 0000000..7663e44
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanEqualCompareNumbers.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class GreaterThanEqualCompareNumbers extends CompareNumbers {
+    public GreaterThanEqualCompareNumbers() {
+        super(NumberComparisonType.GreaterThanOrEqual);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return ">=";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/In.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/In.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/In.java
new file mode 100644
index 0000000..4f9f771
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/In.java
@@ -0,0 +1,20 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import com.jwplayer.sqe.trident.function.IsIn;
+import org.apache.storm.trident.Stream;
+
+
+public class In extends PredicateExpression {
+    public In() {
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new IsIn(), getOutputFields());
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "in";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanCompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanCompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanCompareNumbers.java
new file mode 100644
index 0000000..d1e7223
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanCompareNumbers.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class LessThanCompareNumbers extends CompareNumbers {
+    public LessThanCompareNumbers() {
+        super(NumberComparisonType.LessThan);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "<";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanEqualCompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanEqualCompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanEqualCompareNumbers.java
new file mode 100644
index 0000000..a8ce47c
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanEqualCompareNumbers.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class LessThanEqualCompareNumbers extends CompareNumbers {
+    public LessThanEqualCompareNumbers() {
+        super(NumberComparisonType.LessThanOrEqual);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "<=";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperator.java
new file mode 100644
index 0000000..f1c64aa
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperator.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import com.jwplayer.sqe.trident.function.ProcessLogicalOperator;
+import org.apache.storm.trident.Stream;
+
+
+public abstract class LogicalOperator extends PredicateExpression {
+    LogicalOperatorType operatorType;
+
+    public LogicalOperator(LogicalOperatorType operatorType) {
+        super();
+        this.operatorType = operatorType;
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new ProcessLogicalOperator(operatorType), getOutputFields());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperatorType.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperatorType.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperatorType.java
new file mode 100644
index 0000000..cf3e20c
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperatorType.java
@@ -0,0 +1,8 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+public enum LogicalOperatorType {
+    And,
+    Not,
+    Or,
+    Xor
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotEqualCompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotEqualCompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotEqualCompareNumbers.java
new file mode 100644
index 0000000..f4ca8c6
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotEqualCompareNumbers.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class NotEqualCompareNumbers extends CompareNumbers {
+    public NotEqualCompareNumbers() {
+        super(NumberComparisonType.NotEqual);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "!=";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotLogicalOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotLogicalOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotLogicalOperator.java
new file mode 100644
index 0000000..0925144
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotLogicalOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class NotLogicalOperator extends LogicalOperator {
+    public NotLogicalOperator() {
+        super(LogicalOperatorType.Not);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "not";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NumberComparisonType.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NumberComparisonType.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NumberComparisonType.java
new file mode 100644
index 0000000..581cf40
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NumberComparisonType.java
@@ -0,0 +1,10 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+public enum NumberComparisonType {
+    Equal,
+    GreaterThan,
+    GreaterThanOrEqual,
+    LessThan,
+    LessThanOrEqual,
+    NotEqual
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/OrLogicalOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/OrLogicalOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/OrLogicalOperator.java
new file mode 100644
index 0000000..80e24d3
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/OrLogicalOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class OrLogicalOperator extends LogicalOperator {
+    public OrLogicalOperator() {
+        super(LogicalOperatorType.Or);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "or";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/PredicateExpression.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/PredicateExpression.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/PredicateExpression.java
new file mode 100644
index 0000000..8523406
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/PredicateExpression.java
@@ -0,0 +1,9 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import com.jwplayer.sqe.language.expression.transform.TransformExpression;
+
+
+public abstract class PredicateExpression extends TransformExpression {
+    public PredicateExpression() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/RLike.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/RLike.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/RLike.java
new file mode 100644
index 0000000..a4debed
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/RLike.java
@@ -0,0 +1,20 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import com.jwplayer.sqe.trident.function.EvaluateRegularExpression;
+import org.apache.storm.trident.Stream;
+
+
+public class RLike extends PredicateExpression {
+    public RLike() {
+    }
+
+    @Override
+    public Stream transform(Stream stream) {
+        return stream.each(getInputFields(), new EvaluateRegularExpression(), getOutputFields());
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "rlike";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/XorLogicalOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/XorLogicalOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/XorLogicalOperator.java
new file mode 100644
index 0000000..ae722ce
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/XorLogicalOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class XorLogicalOperator extends LogicalOperator {
+    public XorLogicalOperator() {
+        super(LogicalOperatorType.Xor);
+    }
+
+    @Override
+    public String getFunctionName() {
+        return "xor";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/BaseDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/BaseDeserializer.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/BaseDeserializer.java
new file mode 100644
index 0000000..720ce2d
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/BaseDeserializer.java
@@ -0,0 +1,25 @@
+package com.jwplayer.sqe.language.serde;
+
+import com.jwplayer.sqe.language.serde.avro.AvroDeserializer;
+import com.jwplayer.sqe.language.serde.avro.AvroDeserializerOptions;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Map;
+
+
+public abstract class BaseDeserializer {
+    public abstract Stream deserialize(Stream stream, Fields requiredFields);
+
+    public static BaseDeserializer makeDeserializer(String deserializerName, Map deserializerOptions) {
+        switch(deserializerName.toLowerCase()) {
+            case "avro":
+                AvroDeserializerOptions avroOptions = AvroDeserializerOptions.parse(deserializerOptions);
+                return new AvroDeserializer(avroOptions);
+            case "identity":
+                return new IdentityDeserializer();
+            default:
+                throw new RuntimeException(deserializerName + " is not a supported deserializer");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/IdentityDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/IdentityDeserializer.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/IdentityDeserializer.java
new file mode 100644
index 0000000..76fc262
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/IdentityDeserializer.java
@@ -0,0 +1,16 @@
+package com.jwplayer.sqe.language.serde;
+
+
+import org.apache.storm.trident.Stream;
+import org.apache.storm.tuple.Fields;
+
+public class IdentityDeserializer extends BaseDeserializer {
+    public void IdentityDeserializer() {
+
+    }
+
+    @Override
+    public Stream deserialize(Stream stream, Fields requiredFields) {
+        return stream;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializer.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializer.java
new file mode 100644
index 0000000..f5868f4
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializer.java
@@ -0,0 +1,21 @@
+package com.jwplayer.sqe.language.serde.avro;
+
+import com.jwplayer.sqe.language.serde.BaseDeserializer;
+import com.jwplayer.sqe.trident.function.GetTupleFromAvro;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.tuple.Fields;
+
+
+public class AvroDeserializer extends BaseDeserializer {
+    AvroDeserializerOptions options;
+
+    public AvroDeserializer(AvroDeserializerOptions options) {
+        this.options = options;
+    }
+
+    @Override
+    public Stream deserialize(Stream stream, Fields requiredFields) {
+        return stream
+                .each(stream.getOutputFields(), new GetTupleFromAvro(options.schemaName, requiredFields), requiredFields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializerOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializerOptions.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializerOptions.java
new file mode 100644
index 0000000..3abd155
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializerOptions.java
@@ -0,0 +1,16 @@
+package com.jwplayer.sqe.language.serde.avro;
+
+import java.util.Map;
+
+public class AvroDeserializerOptions {
+    public String schemaName = null;
+
+    public static AvroDeserializerOptions parse(Map map) {
+        AvroDeserializerOptions options = new AvroDeserializerOptions();
+
+        if(map.containsKey("jw.sqe.spout.deserializer.avro.schemaname"))
+            options.schemaName = (String) map.get("jw.sqe.spout.deserializer.avro.schemaname");
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateAdapter.java
new file mode 100644
index 0000000..0483a5b
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateAdapter.java
@@ -0,0 +1,41 @@
+package com.jwplayer.sqe.language.state;
+
+import com.jwplayer.sqe.language.state.kafka.KafkaStateAdapter;
+import com.jwplayer.sqe.language.state.kafka.KafkaStateOptions;
+import com.jwplayer.sqe.language.state.mongodb.MongoDBStateAdapter;
+import com.jwplayer.sqe.language.state.redis.RedisStateAdapter;
+import com.jwplayer.sqe.language.state.redis.RedisStateOptions;
+import com.jwplayer.sqe.trident.state.mongodb.MongoDBStateOptions;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+import java.util.Map;
+
+
+public abstract class StateAdapter {
+    public abstract StateFactory makeFactory(String objectName, List<String> keyFields, String valueField, StateType stateType, StateOperationType stateOperationType);
+
+    public TridentState partitionPersist(Stream stream, StateFactory stateFactory, Fields keyFields) {
+        throw new UnsupportedOperationException("This adapter does not support partition persist");
+    }
+
+    public static StateAdapter makeAdapter(String stateName, Map options) {
+        switch (stateName) {
+            case "mongo":
+                MongoDBStateOptions mongoDBStateOptions = MongoDBStateOptions.parse(options);
+                return new MongoDBStateAdapter(mongoDBStateOptions);
+            case "redis":
+                RedisStateOptions redisStateOptions = RedisStateOptions.parse(options);
+                return new RedisStateAdapter(redisStateOptions);
+            case "kafka":
+                KafkaStateOptions kafkaStateOptions = KafkaStateOptions.parse(options);
+                return new KafkaStateAdapter(kafkaStateOptions);
+            default:
+                throw new RuntimeException("Unknown state name: " + stateName);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateOperationType.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateOperationType.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateOperationType.java
new file mode 100644
index 0000000..4b56fd2
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateOperationType.java
@@ -0,0 +1,6 @@
+package com.jwplayer.sqe.language.state;
+
+public enum StateOperationType {
+    AGGREGATE,
+    NONAGGREGATE
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/JwTridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/JwTridentKafkaStateFactory.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/JwTridentKafkaStateFactory.java
new file mode 100644
index 0000000..9cc927e
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/JwTridentKafkaStateFactory.java
@@ -0,0 +1,75 @@
+package com.jwplayer.sqe.language.state.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.kafka.trident.TridentKafkaState;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.task.IMetricsContext;
+
+
+public class JwTridentKafkaStateFactory implements StateFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JwTridentKafkaStateFactory.class);
+
+    private TridentTupleToKafkaMapper mapper;
+    private KafkaTopicSelector topicSelector;
+    public KafkaStateOptions kafka_options;
+
+    public JwTridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public JwTridentKafkaStateFactory withKafkaOptions(KafkaStateOptions options) {
+        kafka_options = options;
+        return this;
+    }
+
+    public JwTridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
+
+        TridentKafkaState state = new TridentKafkaState()
+                .withKafkaTopicSelector(this.topicSelector)
+                .withTridentTupleToKafkaMapper(this.mapper);
+        Properties kafkaConf = new Properties();
+        for(Map.Entry entry: (Set<Map.Entry>) this.getconf(conf).entrySet()) {
+            if(entry.getValue() != null) {
+                kafkaConf.put(entry.getKey(), entry.getValue());
+            }
+        }
+        state.prepare(kafkaConf);
+        return state;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map getconf(Map conf) {
+        KafkaStateOptions sqeKafkaOptions = kafka_options;
+        Map kafkaConf = new HashMap();
+        kafkaConf.putAll(conf);
+        // Storm 1.0 uses the Kafka 0.9 producer, which has different names for the configs
+        kafkaConf.put("bootstrap.servers", sqeKafkaOptions.metadataBrokerList);
+        kafkaConf.put("acks", sqeKafkaOptions.requestRequiredAcks);
+        kafkaConf.put("producer.type",  sqeKafkaOptions.producerType);
+        kafkaConf.put("key.serializer",sqeKafkaOptions.KeyserializerClass);
+        kafkaConf.put("value.serializer", sqeKafkaOptions.serializerClass);
+        kafkaConf.put("partitioner.class", sqeKafkaOptions.partitionClass);
+
+        return kafkaConf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateAdapter.java
new file mode 100644
index 0000000..71ac874
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateAdapter.java
@@ -0,0 +1,76 @@
+package com.jwplayer.sqe.language.state.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+import com.jwplayer.sqe.language.state.StateAdapter;
+import com.jwplayer.sqe.language.state.StateOperationType;
+import com.jwplayer.sqe.language.stream.StreamAdapter;
+import com.jwplayer.sqe.trident.function.ConvertMetadataToBytes;
+import com.jwplayer.sqe.trident.function.Hash;
+import org.apache.storm.kafka.trident.TridentKafkaUpdater;
+import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.tuple.Fields;
+
+
+public class KafkaStateAdapter extends StateAdapter {
+    public static final String KAFKA_KEY = "_kafka_state_key";
+    protected KafkaStateOptions options ;
+
+    public KafkaStateAdapter(KafkaStateOptions options) {
+        this.options = options;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public StateFactory makeFactory(String objectName, List<String> keyFields, String valueField, StateType stateType, StateOperationType stateOperationType) {
+        return getTridentStateFactory(objectName);
+    }
+
+    protected JwTridentKafkaStateFactory getTridentStateFactory(String topic) {
+        JwTridentKafkaStateFactory factory = new JwTridentKafkaStateFactory();
+        factory.withKafkaOptions(options).withKafkaTopicSelector(new DefaultTopicSelector(topic));
+
+        return factory;
+    }
+
+    public TridentState partitionPersist(Stream stream, StateFactory stateFactory, Fields keyFields) {
+        JwTridentKafkaStateFactory factory = (JwTridentKafkaStateFactory) stateFactory;
+        List<String> persistFields = new ArrayList<>();
+        for(String fieldName: keyFields) persistFields.add(fieldName);
+
+        if(options.KeyType.equals("field")) {
+            factory = factory.withTridentTupleToKafkaMapper(
+                    new FieldNameBasedTupleToKafkaMapper<byte[], byte[]>(keyFields.get(0), keyFields.get(1)));
+        } else {
+            factory = factory.withTridentTupleToKafkaMapper(
+                    new FieldNameBasedTupleToKafkaMapper<byte[], byte[]>(KafkaStateAdapter.KAFKA_KEY, keyFields.get(0)));
+            persistFields.add(KafkaStateAdapter.KAFKA_KEY);
+        }
+
+        switch (options.KeyType) {
+            case "field":
+                break;
+            case "messagehash":
+                stream = stream.each(new Fields(keyFields.get(0)), new Hash(), new Fields(KafkaStateAdapter.KAFKA_KEY));
+                break;
+            case "streammetadata":
+                stream = stream.each(
+                        new Fields(StreamAdapter.STREAM_METADATA_FIELD),
+                        new ConvertMetadataToBytes(),
+                        new Fields(KafkaStateAdapter.KAFKA_KEY)
+                );
+                break;
+            default:
+                throw new IllegalArgumentException(options.KeyType + " is not a supported key type");
+        }
+
+        return stream.partitionPersist(factory, new Fields(persistFields), new TridentKafkaUpdater());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateOptions.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateOptions.java
new file mode 100644
index 0000000..9093c11
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateOptions.java
@@ -0,0 +1,51 @@
+package com.jwplayer.sqe.language.state.kafka;
+
+import com.google.common.base.Joiner;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaStateOptions implements Serializable {
+    public String serializerClass = "org.apache.kafka.common.serialization.ByteArraySerializer";
+    public String metadataBrokerList;
+    public String producerType = "async";
+    public String requestRequiredAcks = "1";
+    public String partitionClass = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";
+    public String KeyserializerClass = "org.apache.kafka.common.serialization.ByteArraySerializer";
+    public String KeyType = "messagehash";
+
+    @SuppressWarnings("unchecked")
+    public static KafkaStateOptions parse(Map map) {
+        KafkaStateOptions options = new KafkaStateOptions();
+
+        if (map.containsKey("jw.sqe.state.kafka.brokers"))
+            options.metadataBrokerList =
+                    Joiner.on(',').join((List<String>) map.get("jw.sqe.state.kafka.brokers"));
+
+        if (map.containsKey("jw.sqe.state.kafka.serializerClass"))
+            options.serializerClass =
+                    (String) map.get("jw.sqe.state.kafka.serializerClass");
+
+        if (map.containsKey("jw.sqe.state.kafka.key.serializerClass"))
+            options.KeyserializerClass =
+                    (String) map.get("jw.sqe.state.kafka.key.serializerClass");
+
+        if (map.containsKey("jw.sqe.state.kafka.partitionClass"))
+            options.partitionClass =
+                    (String) map.get("jw.sqe.state.kafka.partitionClass");
+
+        if (map.containsKey("jw.sqe.state.kafka.producerType"))
+            options.producerType =
+                    (String) map.get("jw.sqe.state.kafka.producerType");
+
+        if (map.containsKey("jw.sqe.state.kafka.request.requiredAcks"))
+            options.requestRequiredAcks =
+                    (String) map.get("jw.sqe.state.kafka.request.requiredAcks");
+
+        if (map.containsKey("jw.sqe.state.kafka.keytype"))
+            options.KeyType = ((String) map.get("jw.sqe.state.kafka.keytype")).toLowerCase();
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/SourcePartitionPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/SourcePartitionPartitioner.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/SourcePartitionPartitioner.java
new file mode 100644
index 0000000..38074b2
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/SourcePartitionPartitioner.java
@@ -0,0 +1,34 @@
+package com.jwplayer.sqe.language.state.kafka;
+
+
+import com.jwplayer.sqe.trident.StreamMetadata;
+
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public class SourcePartitionPartitioner implements Partitioner {
+    @Override
+    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+        int numPartitions = partitions.size();
+        StreamMetadata metadata = StreamMetadata.parseBytes(keyBytes);
+        int sourcePartition = metadata.partition;
+
+        return sourcePartition % numPartitions;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/mongodb/MongoDBStateAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/mongodb/MongoDBStateAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/mongodb/MongoDBStateAdapter.java
new file mode 100644
index 0000000..6ef1c88
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/mongodb/MongoDBStateAdapter.java
@@ -0,0 +1,31 @@
+package com.jwplayer.sqe.language.state.mongodb;
+
+import com.jwplayer.sqe.language.state.StateAdapter;
+import com.jwplayer.sqe.language.state.StateOperationType;
+import com.jwplayer.sqe.trident.state.mongodb.MongoDBState;
+import com.jwplayer.sqe.trident.state.mongodb.MongoDBStateOptions;
+
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+
+import java.util.List;
+
+public class MongoDBStateAdapter extends StateAdapter {
+    private MongoDBStateOptions options;
+
+    public MongoDBStateAdapter(MongoDBStateOptions options) { this.options = options; }
+
+    @Override
+    public StateFactory makeFactory(String objectName, List<String> keyFields, String valueField, StateType stateType, StateOperationType stateOperationType) {
+        switch(stateType) {
+            case NON_TRANSACTIONAL:
+                return MongoDBState.nonTransactional(objectName, keyFields, valueField, options);
+            case OPAQUE:
+                return MongoDBState.opaque(objectName, keyFields, valueField, options);
+            case TRANSACTIONAL:
+                return MongoDBState.transactional(objectName, keyFields, valueField, options);
+            default:
+                throw new RuntimeException("Unknown state type: " + stateType);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateAdapter.java
new file mode 100644
index 0000000..986878b
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateAdapter.java
@@ -0,0 +1,142 @@
+package com.jwplayer.sqe.language.state.redis;
+
+import com.jwplayer.sqe.trident.state.GsonOpaqueSerializer;
+import com.jwplayer.sqe.trident.state.GsonTransactionalSerializer;
+import com.jwplayer.sqe.language.state.StateAdapter;
+import com.jwplayer.sqe.language.state.StateOperationType;
+import com.jwplayer.sqe.trident.state.redis.RedisKeyFactory;
+
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import org.apache.storm.redis.trident.state.Options;
+import org.apache.storm.redis.trident.state.RedisDataTypeDescription;
+import org.apache.storm.redis.trident.state.RedisMapState;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class RedisStateAdapter extends StateAdapter {
+    protected RedisStateOptions options;
+
+    public RedisStateAdapter(RedisStateOptions options) { this.options = options; }
+
+    @Override
+    public TridentState partitionPersist(Stream stream, StateFactory stateFactory, Fields keyFields) {
+        RedisStateFactory factory = (RedisStateFactory) stateFactory;
+        RedisStateUpdater updater = new RedisStateUpdater(new StoreMapper(this.options,
+                                                          factory.objectName, factory.streamFields));
+        updater.setExpireInterval(options.expireintervalsec);
+        return stream.partitionPersist(stateFactory, keyFields, updater);
+    }
+
+    private static class StoreMapper implements RedisStoreMapper {
+        private static final long serialVersionUID = 1L;
+        private RedisKeyFactory keyStringFactory;
+        private RedisKeyFactory valueStringFactory;
+        private org.apache.storm.redis.common.mapper.RedisDataTypeDescription description;
+
+        public StoreMapper(RedisStateOptions options, String objectName, List<String> streamFields) {
+            // No additional key since not aggregating
+            description = new org.apache.storm.redis.common.mapper.RedisDataTypeDescription(
+                    org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.SET, 
+                    objectName);
+
+            List<Integer> keyNameFieldIndexes = new ArrayList<>();
+            List<Integer> fieldNameFieldIndexes = new ArrayList<>();
+
+            for(int i = 0; i < streamFields.size(); i++) {
+                if(options.keyNameFields.contains(streamFields.get(i))) keyNameFieldIndexes.add(i);
+                if(options.fieldNameFields.contains(streamFields.get(i))) fieldNameFieldIndexes.add(i);
+            }
+            keyStringFactory = new RedisKeyFactory(options.delimiter, keyNameFieldIndexes, "", "");
+            valueStringFactory = new RedisKeyFactory(options.delimiter, fieldNameFieldIndexes, "", "");
+        }
+
+        @Override
+        public org.apache.storm.redis.common.mapper.RedisDataTypeDescription getDataTypeDescription() {
+            return description;
+        }
+
+        @Override
+        public String getKeyFromTuple(ITuple tuple) {
+            return keyStringFactory.build(tuple.getValues());
+        }
+
+        @Override
+        public String getValueFromTuple(ITuple tuple) {
+            return valueStringFactory.build(tuple.getValues());
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public StateFactory makeFactory(String objectName, List<String> keyFields, String valueField, StateType stateType, StateOperationType stateOperationType) {
+        JedisPoolConfig poolConfig =
+                new JedisPoolConfig.Builder()
+                        .setDatabase(options.database)
+                        .setHost(options.host)
+                        .setPort(options.port)
+                        .setTimeout(15000)
+                        .build();
+        Options rOptions = new Options();
+
+        // Build the Redis options based on configuration from SQE
+        switch(options.dataType) {
+            case STRING:
+                List<Integer> indexes = new ArrayList<>();
+
+                for(int i = 0; i < keyFields.size(); i++) indexes.add(i);
+
+                rOptions.keyFactory = new RedisKeyFactory(options.delimiter, indexes, objectName, valueField);
+                rOptions.dataTypeDescription = new RedisDataTypeDescription(options.dataType);
+                break;
+            case HASH:
+            case SET:
+                List<Integer> keyNameFieldIndexes = new ArrayList<>();
+                List<Integer> fieldNameFieldIndexes = new ArrayList<>();
+
+                for(int i = 0; i < keyFields.size(); i++) {
+                    if(options.keyNameFields.contains(keyFields.get(i))) keyNameFieldIndexes.add(i);
+                    if(options.fieldNameFields.contains(keyFields.get(i))) fieldNameFieldIndexes.add(i);
+                }
+
+                rOptions.keyFactory = new RedisKeyFactory(options.delimiter, keyNameFieldIndexes, objectName, "");
+                rOptions.dataTypeDescription =
+                        new RedisDataTypeDescription(options.dataType,
+                                new RedisKeyFactory(options.delimiter, fieldNameFieldIndexes, "", valueField));
+                break;
+            case LIST:
+            case SORTED_SET:
+            case HYPER_LOG_LOG:
+                throw new IllegalArgumentException("Unsupported Redis data type: " + options.dataType);
+        }
+
+        // Set TTL for the keys
+        rOptions.expireIntervalSec = options.expireintervalsec;
+
+        if (stateOperationType == StateOperationType.NONAGGREGATE){
+            return new RedisStateFactory(poolConfig, objectName, keyFields);
+        }
+        // Return the appropriate state factory
+        switch(stateType) {
+            case NON_TRANSACTIONAL:
+                return RedisMapState.nonTransactional(poolConfig, rOptions);
+            case OPAQUE:
+                rOptions.serializer = new GsonOpaqueSerializer();
+                return RedisMapState.opaque(poolConfig, rOptions);
+            case TRANSACTIONAL:
+                rOptions.serializer = new GsonTransactionalSerializer();
+                return RedisMapState.transactional(poolConfig, rOptions);
+            default:
+                throw new RuntimeException("Unknown state type: " + stateType);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateFactory.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateFactory.java
new file mode 100644
index 0000000..9faa980
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateFactory.java
@@ -0,0 +1,29 @@
+package com.jwplayer.sqe.language.state.redis;
+
+import java.util.List;
+
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.trident.state.RedisState;
+
+/**
+ * Similar to RedisState.Factory but also allows storing an objectName,
+ * used to name-space hash data in Redis
+ */
+public class RedisStateFactory extends RedisState.Factory{
+    private static final long serialVersionUID = 1L;
+    public List<String> streamFields;
+    public String objectName;
+
+
+    /**
+     * Store objectName for persisting state to Hash (additional key)
+     * @param config
+     * @param objectName
+     */
+    public RedisStateFactory(JedisPoolConfig config, String objectName, List<String> streamFields) {
+        super(config);
+        this.objectName = objectName;
+        this.streamFields = streamFields;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateOptions.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateOptions.java
new file mode 100644
index 0000000..3699480
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateOptions.java
@@ -0,0 +1,43 @@
+package com.jwplayer.sqe.language.state.redis;
+
+import org.apache.storm.redis.trident.state.RedisDataTypeDescription.RedisDataType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class RedisStateOptions implements Serializable {
+    public int database = 0;
+    public RedisDataType dataType = RedisDataType.STRING;
+    public String delimiter = ":";
+    public List<String> fieldNameFields = new ArrayList<>();
+    public String host = "";
+    public List<String> keyNameFields = new ArrayList<>();
+    public int port = 6379;
+    public int expireintervalsec = 0;
+
+    @SuppressWarnings("unchecked")
+    public static RedisStateOptions parse(Map map) {
+        RedisStateOptions options = new RedisStateOptions();
+
+        if(map.containsKey("jw.sqe.state.redis.database"))
+            options.database = ((Number) map.get("jw.sqe.state.redis.database")).intValue();
+        if(map.containsKey("jw.sqe.state.redis.datatype"))
+            options.dataType = RedisDataType.valueOf((String) map.get("jw.sqe.state.redis.datatype"));
+        if(map.containsKey("jw.sqe.state.redis.delimiter"))
+            options.delimiter = (String) map.get("jw.sqe.state.redis.delimiter");
+        if(map.containsKey("jw.sqe.state.redis.expireintervalsec"))
+            options.expireintervalsec = ((Number)map.get("jw.sqe.state.redis.expireintervalsec")).intValue();
+        if(map.containsKey("jw.sqe.state.redis.fieldname.fields"))
+            options.fieldNameFields = (List<String>) map.get("jw.sqe.state.redis.fieldname.fields");
+        if(map.containsKey("jw.sqe.state.redis.host"))
+            options.host = (String) map.get("jw.sqe.state.redis.host");
+        if(map.containsKey("jw.sqe.state.redis.keyname.fields"))
+            options.keyNameFields = (List<String>) map.get("jw.sqe.state.redis.keyname.fields");
+        if(map.containsKey("jw.sqe.state.redis.port"))
+            options.port = Integer.parseInt((String) map.get("jw.sqe.state.redis.port"));
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/InputStreamType.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/InputStreamType.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/InputStreamType.java
new file mode 100644
index 0000000..7bebbd6
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/InputStreamType.java
@@ -0,0 +1,6 @@
+package com.jwplayer.sqe.language.stream;
+
+public enum InputStreamType {
+    BinaryAvro,
+    Tuple
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamAdapter.java
new file mode 100644
index 0000000..f43c35a
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamAdapter.java
@@ -0,0 +1,36 @@
+package com.jwplayer.sqe.language.stream;
+
+import com.clearspring.analytics.hash.MurmurHash;
+import com.jwplayer.sqe.language.stream.kafka.KafkaStreamAdapter;
+import com.jwplayer.sqe.language.stream.kafka.KafkaStreamAdapterOptions;
+import com.jwplayer.sqe.language.stream.testing.FixedBatchSpoutOptions;
+import com.jwplayer.sqe.language.stream.testing.FixedBatchSpoutStreamAdapter;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateType;
+
+import java.util.Map;
+
+
+public abstract class StreamAdapter {
+    public abstract Stream makeStream(TridentTopology topology, String topologyName, String streamName, String objectName, StateType spoutType);
+
+    public static final String STREAM_METADATA_FIELD = "_stream_metadata";
+
+    public static Long createPid(String topologyName, String streamName, String secondaryID) {
+        return MurmurHash.hash64(topologyName + streamName + secondaryID);
+    }
+
+    public static StreamAdapter makeAdapter(String spoutName, Map options) {
+        switch(spoutName.toLowerCase()) {
+            case "fixed":
+                FixedBatchSpoutOptions fixedOptions = FixedBatchSpoutOptions.parse(options);
+                return new FixedBatchSpoutStreamAdapter(fixedOptions);
+            case "kafka":
+                KafkaStreamAdapterOptions kafkaOptions = KafkaStreamAdapterOptions.parse(options);
+                return new KafkaStreamAdapter(kafkaOptions);
+            default:
+                throw new RuntimeException("Unknown spout name: " + spoutName);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamContainer.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamContainer.java
new file mode 100644
index 0000000..8f15ac1
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamContainer.java
@@ -0,0 +1,15 @@
+package com.jwplayer.sqe.language.stream;
+
+import org.apache.storm.trident.Stream;
+
+public class StreamContainer {
+    public InputStreamType inputStreamType;
+    public Stream stream;
+    public String streamName;
+
+    public StreamContainer(InputStreamType inputStreamType, Stream stream, String streamName) {
+        this.inputStreamType = inputStreamType;
+        this.stream = stream;
+        this.streamName = streamName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapter.java
new file mode 100644
index 0000000..e3e5c91
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapter.java
@@ -0,0 +1,52 @@
+package com.jwplayer.sqe.language.stream.kafka;
+
+import com.jwplayer.sqe.language.stream.StreamAdapter;
+import com.jwplayer.sqe.trident.spout.kafka.FilteredOpaqueTridentKafkaSpout;
+import com.jwplayer.sqe.trident.spout.kafka.FilteredTransactionalTridentKafkaSpout;
+import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
+import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateType;
+
+
+public class KafkaStreamAdapter extends StreamAdapter {
+    KafkaStreamAdapterOptions options;
+
+    public KafkaStreamAdapter(KafkaStreamAdapterOptions options) {
+        this.options = options;
+    }
+
+    @Override
+    public Stream makeStream(TridentTopology topology, String topologyName, String streamName, String objectName, StateType spoutType) {
+        String txID = topologyName + "/" + streamName;
+
+        if (options.filterReplays) {
+            switch (spoutType) {
+                case TRANSACTIONAL:
+                    FilteredTransactionalTridentKafkaSpout tSpout =
+                            new FilteredTransactionalTridentKafkaSpout(options.getKafkaConfig(topologyName, streamName, objectName), options.filterReplaysMetadataTtl);
+                    return topology.newStream(txID, tSpout);
+                case OPAQUE:
+                    FilteredOpaqueTridentKafkaSpout oSpout =
+                            new FilteredOpaqueTridentKafkaSpout(options.getKafkaConfig(topologyName, streamName, objectName), options.filterReplaysMetadataTtl);
+                    return topology.newStream(txID, oSpout);
+                default:
+                    throw new RuntimeException(spoutType.toString() + " is not a supported state type");
+            }
+        } else {
+            switch (spoutType) {
+                case TRANSACTIONAL:
+                    TransactionalTridentKafkaSpout tSpout =
+                            new TransactionalTridentKafkaSpout(options.getKafkaConfig(topologyName, streamName, objectName));
+                    return topology.newStream(txID, tSpout);
+                case OPAQUE:
+                    OpaqueTridentKafkaSpout oSpout =
+                            new OpaqueTridentKafkaSpout(options.getKafkaConfig(topologyName, streamName, objectName));
+                    return topology.newStream(txID, oSpout);
+                default:
+                    throw new RuntimeException(spoutType.toString() + " is not a supported state type");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapterOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapterOptions.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapterOptions.java
new file mode 100644
index 0000000..2c70053
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapterOptions.java
@@ -0,0 +1,54 @@
+package com.jwplayer.sqe.language.stream.kafka;
+
+import com.google.common.base.Joiner;
+import com.jwplayer.sqe.trident.spout.kafka.SqeRawFullScheme;
+import org.apache.storm.kafka.FullSchemeAsMultiScheme;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.kafka.trident.TridentKafkaConfig;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+
+public class KafkaStreamAdapterOptions implements Serializable {
+    Integer bufferSizeBytes = null;
+    String clientID;
+    Integer fetchSizeBytes = null;
+    Boolean filterReplays = false;
+    Long filterReplaysMetadataTtl = 60L * 60L * 24L * 2L;
+    Long maxOffsetBehind = null;
+    ZkHosts zkHosts;
+
+    public TridentKafkaConfig getKafkaConfig(String topologyName, String streamName, String topic) {
+        TridentKafkaConfig config = new TridentKafkaConfig(zkHosts, topic, clientID);
+        if(bufferSizeBytes != null) config.bufferSizeBytes = bufferSizeBytes;
+        if(fetchSizeBytes != null) config.fetchSizeBytes = fetchSizeBytes;
+        if(maxOffsetBehind != null) config.maxOffsetBehind = maxOffsetBehind;
+        config.scheme = new FullSchemeAsMultiScheme(
+                new SqeRawFullScheme(topologyName, streamName, zkHosts)
+        );
+
+        return config;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static KafkaStreamAdapterOptions parse(Map map) {
+        KafkaStreamAdapterOptions options = new KafkaStreamAdapterOptions();
+        options.zkHosts = new ZkHosts(Joiner.on(',').join((List<String>) map.get("jw.sqe.spout.kafka.zkhosts")));
+        options.clientID = (String) map.get("jw.sqe.spout.kafka.clientid");
+
+        if(map.containsKey("jw.sqe.spout.kafka.bufferSizeBytes"))
+            options.bufferSizeBytes = (int) map.get("jw.sqe.spout.kafka.bufferSizeBytes");
+        if(map.containsKey("jw.sqe.spout.kafka.fetchSizeBytes"))
+            options.fetchSizeBytes = (int) map.get("jw.sqe.spout.kafka.fetchSizeBytes");
+        if(map.containsKey("jw.sqe.spout.kafka.filterReplays"))
+            options.filterReplays = (boolean) map.get("jw.sqe.spout.kafka.filterReplays");
+        if(map.containsKey("jw.sqe.spout.kafka.filterReplays.metadata.ttl"))
+            options.filterReplaysMetadataTtl = (long) map.get("jw.sqe.spout.kafka.filterReplays.metadata.ttl");
+        if(map.containsKey("jw.sqe.spout.kafka.maxOffsetBehind"))
+            options.maxOffsetBehind = (long) map.get("jw.sqe.spout.kafka.maxOffsetBehind");
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutOptions.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutOptions.java
new file mode 100644
index 0000000..defb4db
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutOptions.java
@@ -0,0 +1,33 @@
+package com.jwplayer.sqe.language.stream.testing;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+public class FixedBatchSpoutOptions implements Serializable {
+    public Fields fields;
+    public List<List<Object>> values;
+
+    @SuppressWarnings("unchecked")
+    public static FixedBatchSpoutOptions parse(Map map) {
+        FixedBatchSpoutOptions options = new FixedBatchSpoutOptions();
+
+        if(map.containsKey("jw.sqe.spout.fixed.fields")) options.fields =
+                new Fields((List) map.get("jw.sqe.spout.fixed.fields"));
+        options.values = new LinkedList<>();
+        if(map.containsKey("jw.sqe.spout.fixed.values")) {
+            List<List<Object>> rawValues = (List) map.get("jw.sqe.spout.fixed.values");
+
+            for(List<Object> rawValue: rawValues) {
+                options.values.add(new Values(rawValue.toArray()));
+            }
+        }
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutStreamAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutStreamAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutStreamAdapter.java
new file mode 100644
index 0000000..310763e
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutStreamAdapter.java
@@ -0,0 +1,33 @@
+package com.jwplayer.sqe.language.stream.testing;
+
+import com.jwplayer.sqe.language.stream.StreamAdapter;
+import com.jwplayer.sqe.trident.StreamMetadata;
+import com.jwplayer.sqe.trident.function.AddField;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
+
+public class FixedBatchSpoutStreamAdapter extends StreamAdapter {
+    FixedBatchSpoutOptions options;
+
+    public FixedBatchSpoutStreamAdapter(FixedBatchSpoutOptions options) {
+        this.options = options;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Stream makeStream(TridentTopology topology, String topologyName, String streamName, String objectName, StateType spoutType) {
+        List<Object>[] values = (List<Object>[]) options.values.toArray(new List[options.values.size()]);
+        FixedBatchSpout spout = new FixedBatchSpout(options.fields, options.values.size(), values);
+        Stream stream = topology.newStream(topologyName + "/" + streamName, spout);
+        long pid = StreamAdapter.createPid(topologyName, streamName, "");
+        StreamMetadata metadata = new StreamMetadata(pid, 0, 0L);
+        stream = stream.each(new Fields(), new AddField(metadata), new Fields(StreamAdapter.STREAM_METADATA_FIELD));
+        return stream;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ListValuesCollector.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ListValuesCollector.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ListValuesCollector.java
new file mode 100644
index 0000000..eb638a3
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ListValuesCollector.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.trident;
+
+import org.apache.storm.trident.operation.TridentCollector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class ListValuesCollector implements TridentCollector {
+    public List<List<Object>> values = new ArrayList<>();
+
+    public void emit(List<Object> values) {
+        this.values.add(values);
+    }
+
+    public void reportError(Throwable throwable) {
+        throw new RuntimeException(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/SingleValuesCollector.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/SingleValuesCollector.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/SingleValuesCollector.java
new file mode 100644
index 0000000..99b6ad3
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/SingleValuesCollector.java
@@ -0,0 +1,18 @@
+package com.jwplayer.sqe.trident;
+
+import org.apache.storm.trident.operation.TridentCollector;
+
+import java.util.List;
+
+
+public class SingleValuesCollector implements TridentCollector {
+    public List<Object> values = null;
+
+    public void emit(List<Object> values) {
+        this.values = values;
+    }
+
+    public void reportError(Throwable throwable) {
+        throw new RuntimeException(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/StreamMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/StreamMetadata.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/StreamMetadata.java
new file mode 100644
index 0000000..913ea63
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/StreamMetadata.java
@@ -0,0 +1,43 @@
+package com.jwplayer.sqe.trident;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import org.apache.storm.shade.org.apache.commons.lang.ArrayUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+
+public class StreamMetadata implements Serializable {
+    public long pid;
+    public int partition;
+    public long offset;
+
+    public StreamMetadata(long pid, int partition, long offset) {
+        this.pid = pid;
+        this.partition = partition;
+        this.offset = offset;
+    }
+
+    public String getPidAndPartitionAsHex() {
+        return Long.toHexString(pid) + "-" + Integer.toHexString(partition);
+    }
+
+    public byte[] toBytes() {
+        return ArrayUtils.addAll(
+                Longs.toByteArray(pid),
+                ArrayUtils.addAll(Ints.toByteArray(partition), Longs.toByteArray(offset))
+        );
+    }
+
+    public static StreamMetadata parseBytes(byte[] bytes) {
+        Preconditions.checkArgument(bytes.length == 20, "Stream metadata bytes representation must contain exactly 20 bytes");
+
+        long pid = Longs.fromByteArray(Arrays.copyOfRange(bytes, 0, 8));
+        int partition = Ints.fromByteArray(Arrays.copyOfRange(bytes, 8, 12));
+        long offset = Longs.fromByteArray(Arrays.copyOfRange(bytes, 12, 20));
+
+        return new StreamMetadata(pid, partition, offset);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ValueFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ValueFilter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ValueFilter.java
new file mode 100644
index 0000000..fad4651
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ValueFilter.java
@@ -0,0 +1,11 @@
+package com.jwplayer.sqe.trident;
+
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class ValueFilter extends BaseFilter {
+    @Override
+    public boolean isKeep(TridentTuple tuple) {
+        return tuple.getBoolean(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorAggregator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorAggregator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorAggregator.java
new file mode 100644
index 0000000..e106aa4
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorAggregator.java
@@ -0,0 +1,30 @@
+package com.jwplayer.sqe.trident.aggregator;
+
+import org.apache.storm.tuple.Values;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.IOException;
+
+
+/* This aggregator is for creating new cardinality estimators and efficiently adding fields you want
+ * to count. Once the object is created, you can use the associated CombinerAggregator for combing multiple
+ * objects into a single bitmap. */
+public abstract class CardinalityEstimatorAggregator extends BaseAggregator<ICardinality> {
+    @Override
+    public void aggregate(ICardinality cardinalityEstimator, TridentTuple tuple, TridentCollector collector) {
+        // Replicate SQL COUNT(DISTINCT) functionality to not count NULLs as a countable value
+        if(tuple.getValue(0) != null) cardinalityEstimator.offer(tuple.getValue(0));
+    }
+
+    @Override
+    public void complete(ICardinality cardinalityEstimator, TridentCollector collector) {
+        try {
+            collector.emit(new Values(cardinalityEstimator.getBytes()));
+        } catch(IOException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorCombinerAggregator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorCombinerAggregator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorCombinerAggregator.java
new file mode 100644
index 0000000..32da432
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorCombinerAggregator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.trident.aggregator;
+
+import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+
+/* This aggregator is for taking existing cardinality estimator bitmaps/objects and aggregating/combining them. */
+public abstract class CardinalityEstimatorCombinerAggregator implements CombinerAggregator<byte[]> {
+    @Override
+    public byte[] init(TridentTuple tuple) {
+        return (byte[]) tuple.getValue(0);
+    }
+}