You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/09/28 20:38:08 UTC
[04/11] storm git commit: initial SQE import
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandKeys.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandKeys.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandKeys.java
new file mode 100644
index 0000000..1e08781
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandKeys.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class ExpandKeys extends TransformExpression {
+ public ExpandKeys() {
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.ExpandKeys(), getOutputFields());
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "expandkeys";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandValues.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandValues.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandValues.java
new file mode 100644
index 0000000..9c3d331
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ExpandValues.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class ExpandValues extends TransformExpression {
+ public ExpandValues() {
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.ExpandValues(), getOutputFields());
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "expandvalues";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/FormatDate.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/FormatDate.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/FormatDate.java
new file mode 100644
index 0000000..dace3f5
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/FormatDate.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class FormatDate extends TransformExpression {
+ public FormatDate() {
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "formatdate";
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.FormatDate(), getOutputFields());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetMapValue.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetMapValue.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetMapValue.java
new file mode 100644
index 0000000..2d50425
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetMapValue.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class GetMapValue extends TransformExpression{
+ public GetMapValue() {
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "getmapvalue";
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.GetMapValue(), getOutputFields());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetTime.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetTime.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetTime.java
new file mode 100644
index 0000000..747a71a
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/GetTime.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class GetTime extends TransformExpression {
+ public GetTime() {
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "gettime";
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.GetTime(), getOutputFields());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/Hash.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/Hash.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/Hash.java
new file mode 100644
index 0000000..f354a5f
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/Hash.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class Hash extends TransformExpression {
+ public Hash() {
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.Hash(), getOutputFields());
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "hash";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/If.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/If.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/If.java
new file mode 100644
index 0000000..fe61093
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/If.java
@@ -0,0 +1,20 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import com.jwplayer.sqe.trident.function.GetIf;
+import org.apache.storm.trident.Stream;
+
+
+public class If extends TransformExpression {
+ public If() {
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new GetIf(), getOutputFields());
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "if";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ModulusArithmeticOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ModulusArithmeticOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ModulusArithmeticOperator.java
new file mode 100644
index 0000000..86b077f
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ModulusArithmeticOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+
+public class ModulusArithmeticOperator extends ArithmeticOperator {
+ public ModulusArithmeticOperator() {
+ super(ArithmeticOperatorType.Modulus);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "%";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/MultiplicationArithmeticOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/MultiplicationArithmeticOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/MultiplicationArithmeticOperator.java
new file mode 100644
index 0000000..232cedb
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/MultiplicationArithmeticOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+
+public class MultiplicationArithmeticOperator extends ArithmeticOperator {
+ public MultiplicationArithmeticOperator() {
+ super(ArithmeticOperatorType.Multiplication);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "*";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ParseDate.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ParseDate.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ParseDate.java
new file mode 100644
index 0000000..be74931
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/ParseDate.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class ParseDate extends TransformExpression {
+ public ParseDate() {
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "parsedate";
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.ParseDate(), getOutputFields());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/PlusArithmeticOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/PlusArithmeticOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/PlusArithmeticOperator.java
new file mode 100644
index 0000000..7268bea
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/PlusArithmeticOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+
+public class PlusArithmeticOperator extends ArithmeticOperator {
+ public PlusArithmeticOperator() {
+ super(ArithmeticOperatorType.Addition);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "+";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/RoundDate.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/RoundDate.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/RoundDate.java
new file mode 100644
index 0000000..5f4c02b
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/RoundDate.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import org.apache.storm.trident.Stream;
+
+
+public class RoundDate extends TransformExpression {
+ public RoundDate() {
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "rounddate";
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new com.jwplayer.sqe.trident.function.RoundDate(), getOutputFields());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/SubtractionArithmeticOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/SubtractionArithmeticOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/SubtractionArithmeticOperator.java
new file mode 100644
index 0000000..c2d0c95
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/SubtractionArithmeticOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+
+public class SubtractionArithmeticOperator extends ArithmeticOperator {
+ public SubtractionArithmeticOperator() {
+ super(ArithmeticOperatorType.Subtraction);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "-";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/TransformExpression.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/TransformExpression.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/TransformExpression.java
new file mode 100644
index 0000000..2d4bb8c
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/TransformExpression.java
@@ -0,0 +1,18 @@
+package com.jwplayer.sqe.language.expression.transform;
+
+import com.jwplayer.sqe.language.expression.FunctionExpression;
+import com.jwplayer.sqe.language.expression.FunctionType;
+import org.apache.storm.trident.Stream;
+
+
+public abstract class TransformExpression extends FunctionExpression {
+ public TransformExpression() {
+ }
+
+ @Override
+ public FunctionType getFunctionType() {
+ return FunctionType.Transform;
+ }
+
+ public abstract Stream transform(Stream stream);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/AndLogicalOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/AndLogicalOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/AndLogicalOperator.java
new file mode 100644
index 0000000..b8f82fa
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/AndLogicalOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class AndLogicalOperator extends LogicalOperator {
+ public AndLogicalOperator() {
+ super(LogicalOperatorType.And);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "and";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/CompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/CompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/CompareNumbers.java
new file mode 100644
index 0000000..d7e186b
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/CompareNumbers.java
@@ -0,0 +1,21 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import org.apache.storm.trident.Stream;
+
+
+public abstract class CompareNumbers extends PredicateExpression {
+ private NumberComparisonType comparisonType = null;
+
+ public CompareNumbers(NumberComparisonType comparisonType) {
+ super();
+ this.comparisonType = comparisonType;
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ com.jwplayer.sqe.trident.function.CompareNumbers function =
+ new com.jwplayer.sqe.trident.function.CompareNumbers(comparisonType);
+
+ return stream.each(getInputFields(), function, getOutputFields());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/Equals.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/Equals.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/Equals.java
new file mode 100644
index 0000000..7b6c123
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/Equals.java
@@ -0,0 +1,20 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import com.jwplayer.sqe.trident.function.CompareObjects;
+import org.apache.storm.trident.Stream;
+
+
+public class Equals extends PredicateExpression {
+ public Equals() {
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new CompareObjects(), getOutputFields());
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "=";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanCompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanCompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanCompareNumbers.java
new file mode 100644
index 0000000..8431754
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanCompareNumbers.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class GreaterThanCompareNumbers extends CompareNumbers {
+ public GreaterThanCompareNumbers() {
+ super(NumberComparisonType.GreaterThan);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return ">";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanEqualCompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanEqualCompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanEqualCompareNumbers.java
new file mode 100644
index 0000000..7663e44
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/GreaterThanEqualCompareNumbers.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class GreaterThanEqualCompareNumbers extends CompareNumbers {
+ public GreaterThanEqualCompareNumbers() {
+ super(NumberComparisonType.GreaterThanOrEqual);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return ">=";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/In.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/In.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/In.java
new file mode 100644
index 0000000..4f9f771
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/In.java
@@ -0,0 +1,20 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import com.jwplayer.sqe.trident.function.IsIn;
+import org.apache.storm.trident.Stream;
+
+
+public class In extends PredicateExpression {
+ public In() {
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new IsIn(), getOutputFields());
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "in";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanCompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanCompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanCompareNumbers.java
new file mode 100644
index 0000000..d1e7223
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanCompareNumbers.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class LessThanCompareNumbers extends CompareNumbers {
+ public LessThanCompareNumbers() {
+ super(NumberComparisonType.LessThan);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "<";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanEqualCompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanEqualCompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanEqualCompareNumbers.java
new file mode 100644
index 0000000..a8ce47c
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LessThanEqualCompareNumbers.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class LessThanEqualCompareNumbers extends CompareNumbers {
+ public LessThanEqualCompareNumbers() {
+ super(NumberComparisonType.LessThanOrEqual);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "<=";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperator.java
new file mode 100644
index 0000000..f1c64aa
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperator.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import com.jwplayer.sqe.trident.function.ProcessLogicalOperator;
+import org.apache.storm.trident.Stream;
+
+
+public abstract class LogicalOperator extends PredicateExpression {
+ LogicalOperatorType operatorType;
+
+ public LogicalOperator(LogicalOperatorType operatorType) {
+ super();
+ this.operatorType = operatorType;
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new ProcessLogicalOperator(operatorType), getOutputFields());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperatorType.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperatorType.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperatorType.java
new file mode 100644
index 0000000..cf3e20c
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/LogicalOperatorType.java
@@ -0,0 +1,8 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+public enum LogicalOperatorType {
+ And,
+ Not,
+ Or,
+ Xor
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotEqualCompareNumbers.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotEqualCompareNumbers.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotEqualCompareNumbers.java
new file mode 100644
index 0000000..f4ca8c6
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotEqualCompareNumbers.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class NotEqualCompareNumbers extends CompareNumbers {
+ public NotEqualCompareNumbers() {
+ super(NumberComparisonType.NotEqual);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "!=";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotLogicalOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotLogicalOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotLogicalOperator.java
new file mode 100644
index 0000000..0925144
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NotLogicalOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class NotLogicalOperator extends LogicalOperator {
+ public NotLogicalOperator() {
+ super(LogicalOperatorType.Not);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "not";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NumberComparisonType.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NumberComparisonType.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NumberComparisonType.java
new file mode 100644
index 0000000..581cf40
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/NumberComparisonType.java
@@ -0,0 +1,10 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+public enum NumberComparisonType {
+ Equal,
+ GreaterThan,
+ GreaterThanOrEqual,
+ LessThan,
+ LessThanOrEqual,
+ NotEqual
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/OrLogicalOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/OrLogicalOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/OrLogicalOperator.java
new file mode 100644
index 0000000..80e24d3
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/OrLogicalOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class OrLogicalOperator extends LogicalOperator {
+ public OrLogicalOperator() {
+ super(LogicalOperatorType.Or);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "or";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/PredicateExpression.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/PredicateExpression.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/PredicateExpression.java
new file mode 100644
index 0000000..8523406
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/PredicateExpression.java
@@ -0,0 +1,9 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import com.jwplayer.sqe.language.expression.transform.TransformExpression;
+
+
+public abstract class PredicateExpression extends TransformExpression {
+ public PredicateExpression() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/RLike.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/RLike.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/RLike.java
new file mode 100644
index 0000000..a4debed
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/RLike.java
@@ -0,0 +1,20 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+import com.jwplayer.sqe.trident.function.EvaluateRegularExpression;
+import org.apache.storm.trident.Stream;
+
+
+public class RLike extends PredicateExpression {
+ public RLike() {
+ }
+
+ @Override
+ public Stream transform(Stream stream) {
+ return stream.each(getInputFields(), new EvaluateRegularExpression(), getOutputFields());
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "rlike";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/XorLogicalOperator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/XorLogicalOperator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/XorLogicalOperator.java
new file mode 100644
index 0000000..ae722ce
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/expression/transform/predicate/XorLogicalOperator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.language.expression.transform.predicate;
+
+
+public class XorLogicalOperator extends LogicalOperator {
+ public XorLogicalOperator() {
+ super(LogicalOperatorType.Xor);
+ }
+
+ @Override
+ public String getFunctionName() {
+ return "xor";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/BaseDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/BaseDeserializer.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/BaseDeserializer.java
new file mode 100644
index 0000000..720ce2d
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/BaseDeserializer.java
@@ -0,0 +1,25 @@
+package com.jwplayer.sqe.language.serde;
+
+import com.jwplayer.sqe.language.serde.avro.AvroDeserializer;
+import com.jwplayer.sqe.language.serde.avro.AvroDeserializerOptions;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Map;
+
+
+public abstract class BaseDeserializer {
+ public abstract Stream deserialize(Stream stream, Fields requiredFields);
+
+ public static BaseDeserializer makeDeserializer(String deserializerName, Map deserializerOptions) {
+ switch(deserializerName.toLowerCase()) {
+ case "avro":
+ AvroDeserializerOptions avroOptions = AvroDeserializerOptions.parse(deserializerOptions);
+ return new AvroDeserializer(avroOptions);
+ case "identity":
+ return new IdentityDeserializer();
+ default:
+ throw new RuntimeException(deserializerName + " is not a supported deserializer");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/IdentityDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/IdentityDeserializer.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/IdentityDeserializer.java
new file mode 100644
index 0000000..76fc262
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/IdentityDeserializer.java
@@ -0,0 +1,16 @@
+package com.jwplayer.sqe.language.serde;
+
+
+import org.apache.storm.trident.Stream;
+import org.apache.storm.tuple.Fields;
+
+public class IdentityDeserializer extends BaseDeserializer {
+ public void IdentityDeserializer() {
+
+ }
+
+ @Override
+ public Stream deserialize(Stream stream, Fields requiredFields) {
+ return stream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializer.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializer.java
new file mode 100644
index 0000000..f5868f4
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializer.java
@@ -0,0 +1,21 @@
+package com.jwplayer.sqe.language.serde.avro;
+
+import com.jwplayer.sqe.language.serde.BaseDeserializer;
+import com.jwplayer.sqe.trident.function.GetTupleFromAvro;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.tuple.Fields;
+
+
+public class AvroDeserializer extends BaseDeserializer {
+ AvroDeserializerOptions options;
+
+ public AvroDeserializer(AvroDeserializerOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ public Stream deserialize(Stream stream, Fields requiredFields) {
+ return stream
+ .each(stream.getOutputFields(), new GetTupleFromAvro(options.schemaName, requiredFields), requiredFields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializerOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializerOptions.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializerOptions.java
new file mode 100644
index 0000000..3abd155
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/serde/avro/AvroDeserializerOptions.java
@@ -0,0 +1,16 @@
+package com.jwplayer.sqe.language.serde.avro;
+
+import java.util.Map;
+
+public class AvroDeserializerOptions {
+ public String schemaName = null;
+
+ public static AvroDeserializerOptions parse(Map map) {
+ AvroDeserializerOptions options = new AvroDeserializerOptions();
+
+ if(map.containsKey("jw.sqe.spout.deserializer.avro.schemaname"))
+ options.schemaName = (String) map.get("jw.sqe.spout.deserializer.avro.schemaname");
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateAdapter.java
new file mode 100644
index 0000000..0483a5b
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateAdapter.java
@@ -0,0 +1,41 @@
+package com.jwplayer.sqe.language.state;
+
+import com.jwplayer.sqe.language.state.kafka.KafkaStateAdapter;
+import com.jwplayer.sqe.language.state.kafka.KafkaStateOptions;
+import com.jwplayer.sqe.language.state.mongodb.MongoDBStateAdapter;
+import com.jwplayer.sqe.language.state.redis.RedisStateAdapter;
+import com.jwplayer.sqe.language.state.redis.RedisStateOptions;
+import com.jwplayer.sqe.trident.state.mongodb.MongoDBStateOptions;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+import java.util.Map;
+
+
+public abstract class StateAdapter {
+ public abstract StateFactory makeFactory(String objectName, List<String> keyFields, String valueField, StateType stateType, StateOperationType stateOperationType);
+
+ public TridentState partitionPersist(Stream stream, StateFactory stateFactory, Fields keyFields) {
+ throw new UnsupportedOperationException("This adapter does not support partition persist");
+ }
+
+ public static StateAdapter makeAdapter(String stateName, Map options) {
+ switch (stateName) {
+ case "mongo":
+ MongoDBStateOptions mongoDBStateOptions = MongoDBStateOptions.parse(options);
+ return new MongoDBStateAdapter(mongoDBStateOptions);
+ case "redis":
+ RedisStateOptions redisStateOptions = RedisStateOptions.parse(options);
+ return new RedisStateAdapter(redisStateOptions);
+ case "kafka":
+ KafkaStateOptions kafkaStateOptions = KafkaStateOptions.parse(options);
+ return new KafkaStateAdapter(kafkaStateOptions);
+ default:
+ throw new RuntimeException("Unknown state name: " + stateName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateOperationType.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateOperationType.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateOperationType.java
new file mode 100644
index 0000000..4b56fd2
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/StateOperationType.java
@@ -0,0 +1,6 @@
+package com.jwplayer.sqe.language.state;
+
+public enum StateOperationType {
+ AGGREGATE,
+ NONAGGREGATE
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/JwTridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/JwTridentKafkaStateFactory.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/JwTridentKafkaStateFactory.java
new file mode 100644
index 0000000..9cc927e
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/JwTridentKafkaStateFactory.java
@@ -0,0 +1,75 @@
+package com.jwplayer.sqe.language.state.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.kafka.trident.TridentKafkaState;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.task.IMetricsContext;
+
+
+public class JwTridentKafkaStateFactory implements StateFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JwTridentKafkaStateFactory.class);
+
+ private TridentTupleToKafkaMapper mapper;
+ private KafkaTopicSelector topicSelector;
+ public KafkaStateOptions kafka_options;
+
+ public JwTridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public JwTridentKafkaStateFactory withKafkaOptions(KafkaStateOptions options) {
+ kafka_options = options;
+ return this;
+ }
+
+ public JwTridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) {
+ this.topicSelector = selector;
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
+
+ TridentKafkaState state = new TridentKafkaState()
+ .withKafkaTopicSelector(this.topicSelector)
+ .withTridentTupleToKafkaMapper(this.mapper);
+ Properties kafkaConf = new Properties();
+ for(Map.Entry entry: (Set<Map.Entry>) this.getconf(conf).entrySet()) {
+ if(entry.getValue() != null) {
+ kafkaConf.put(entry.getKey(), entry.getValue());
+ }
+ }
+ state.prepare(kafkaConf);
+ return state;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map getconf(Map conf) {
+ KafkaStateOptions sqeKafkaOptions = kafka_options;
+ Map kafkaConf = new HashMap();
+ kafkaConf.putAll(conf);
+ // Storm 1.0 uses the Kafka 0.9 producer, which has different names for the configs
+ kafkaConf.put("bootstrap.servers", sqeKafkaOptions.metadataBrokerList);
+ kafkaConf.put("acks", sqeKafkaOptions.requestRequiredAcks);
+ kafkaConf.put("producer.type", sqeKafkaOptions.producerType);
+ kafkaConf.put("key.serializer",sqeKafkaOptions.KeyserializerClass);
+ kafkaConf.put("value.serializer", sqeKafkaOptions.serializerClass);
+ kafkaConf.put("partitioner.class", sqeKafkaOptions.partitionClass);
+
+ return kafkaConf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateAdapter.java
new file mode 100644
index 0000000..71ac874
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateAdapter.java
@@ -0,0 +1,76 @@
+package com.jwplayer.sqe.language.state.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+import com.jwplayer.sqe.language.state.StateAdapter;
+import com.jwplayer.sqe.language.state.StateOperationType;
+import com.jwplayer.sqe.language.stream.StreamAdapter;
+import com.jwplayer.sqe.trident.function.ConvertMetadataToBytes;
+import com.jwplayer.sqe.trident.function.Hash;
+import org.apache.storm.kafka.trident.TridentKafkaUpdater;
+import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.tuple.Fields;
+
+
+public class KafkaStateAdapter extends StateAdapter {
+ public static final String KAFKA_KEY = "_kafka_state_key";
+ protected KafkaStateOptions options ;
+
+ public KafkaStateAdapter(KafkaStateOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public StateFactory makeFactory(String objectName, List<String> keyFields, String valueField, StateType stateType, StateOperationType stateOperationType) {
+ return getTridentStateFactory(objectName);
+ }
+
+ protected JwTridentKafkaStateFactory getTridentStateFactory(String topic) {
+ JwTridentKafkaStateFactory factory = new JwTridentKafkaStateFactory();
+ factory.withKafkaOptions(options).withKafkaTopicSelector(new DefaultTopicSelector(topic));
+
+ return factory;
+ }
+
+ public TridentState partitionPersist(Stream stream, StateFactory stateFactory, Fields keyFields) {
+ JwTridentKafkaStateFactory factory = (JwTridentKafkaStateFactory) stateFactory;
+ List<String> persistFields = new ArrayList<>();
+ for(String fieldName: keyFields) persistFields.add(fieldName);
+
+ if(options.KeyType.equals("field")) {
+ factory = factory.withTridentTupleToKafkaMapper(
+ new FieldNameBasedTupleToKafkaMapper<byte[], byte[]>(keyFields.get(0), keyFields.get(1)));
+ } else {
+ factory = factory.withTridentTupleToKafkaMapper(
+ new FieldNameBasedTupleToKafkaMapper<byte[], byte[]>(KafkaStateAdapter.KAFKA_KEY, keyFields.get(0)));
+ persistFields.add(KafkaStateAdapter.KAFKA_KEY);
+ }
+
+ switch (options.KeyType) {
+ case "field":
+ break;
+ case "messagehash":
+ stream = stream.each(new Fields(keyFields.get(0)), new Hash(), new Fields(KafkaStateAdapter.KAFKA_KEY));
+ break;
+ case "streammetadata":
+ stream = stream.each(
+ new Fields(StreamAdapter.STREAM_METADATA_FIELD),
+ new ConvertMetadataToBytes(),
+ new Fields(KafkaStateAdapter.KAFKA_KEY)
+ );
+ break;
+ default:
+ throw new IllegalArgumentException(options.KeyType + " is not a supported key type");
+ }
+
+ return stream.partitionPersist(factory, new Fields(persistFields), new TridentKafkaUpdater());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateOptions.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateOptions.java
new file mode 100644
index 0000000..9093c11
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/KafkaStateOptions.java
@@ -0,0 +1,51 @@
+package com.jwplayer.sqe.language.state.kafka;
+
+import com.google.common.base.Joiner;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaStateOptions implements Serializable {
+ public String serializerClass = "org.apache.kafka.common.serialization.ByteArraySerializer";
+ public String metadataBrokerList;
+ public String producerType = "async";
+ public String requestRequiredAcks = "1";
+ public String partitionClass = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";
+ public String KeyserializerClass = "org.apache.kafka.common.serialization.ByteArraySerializer";
+ public String KeyType = "messagehash";
+
+ @SuppressWarnings("unchecked")
+ public static KafkaStateOptions parse(Map map) {
+ KafkaStateOptions options = new KafkaStateOptions();
+
+ if (map.containsKey("jw.sqe.state.kafka.brokers"))
+ options.metadataBrokerList =
+ Joiner.on(',').join((List<String>) map.get("jw.sqe.state.kafka.brokers"));
+
+ if (map.containsKey("jw.sqe.state.kafka.serializerClass"))
+ options.serializerClass =
+ (String) map.get("jw.sqe.state.kafka.serializerClass");
+
+ if (map.containsKey("jw.sqe.state.kafka.key.serializerClass"))
+ options.KeyserializerClass =
+ (String) map.get("jw.sqe.state.kafka.key.serializerClass");
+
+ if (map.containsKey("jw.sqe.state.kafka.partitionClass"))
+ options.partitionClass =
+ (String) map.get("jw.sqe.state.kafka.partitionClass");
+
+ if (map.containsKey("jw.sqe.state.kafka.producerType"))
+ options.producerType =
+ (String) map.get("jw.sqe.state.kafka.producerType");
+
+ if (map.containsKey("jw.sqe.state.kafka.request.requiredAcks"))
+ options.requestRequiredAcks =
+ (String) map.get("jw.sqe.state.kafka.request.requiredAcks");
+
+ if (map.containsKey("jw.sqe.state.kafka.keytype"))
+ options.KeyType = ((String) map.get("jw.sqe.state.kafka.keytype")).toLowerCase();
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/SourcePartitionPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/SourcePartitionPartitioner.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/SourcePartitionPartitioner.java
new file mode 100644
index 0000000..38074b2
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/kafka/SourcePartitionPartitioner.java
@@ -0,0 +1,34 @@
+package com.jwplayer.sqe.language.state.kafka;
+
+
+import com.jwplayer.sqe.trident.StreamMetadata;
+
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public class SourcePartitionPartitioner implements Partitioner {
+ @Override
+ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+ List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+ int numPartitions = partitions.size();
+ StreamMetadata metadata = StreamMetadata.parseBytes(keyBytes);
+ int sourcePartition = metadata.partition;
+
+ return sourcePartition % numPartitions;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> map) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/mongodb/MongoDBStateAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/mongodb/MongoDBStateAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/mongodb/MongoDBStateAdapter.java
new file mode 100644
index 0000000..6ef1c88
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/mongodb/MongoDBStateAdapter.java
@@ -0,0 +1,31 @@
+package com.jwplayer.sqe.language.state.mongodb;
+
+import com.jwplayer.sqe.language.state.StateAdapter;
+import com.jwplayer.sqe.language.state.StateOperationType;
+import com.jwplayer.sqe.trident.state.mongodb.MongoDBState;
+import com.jwplayer.sqe.trident.state.mongodb.MongoDBStateOptions;
+
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+
+import java.util.List;
+
+public class MongoDBStateAdapter extends StateAdapter {
+ private MongoDBStateOptions options;
+
+ public MongoDBStateAdapter(MongoDBStateOptions options) { this.options = options; }
+
+ @Override
+ public StateFactory makeFactory(String objectName, List<String> keyFields, String valueField, StateType stateType, StateOperationType stateOperationType) {
+ switch(stateType) {
+ case NON_TRANSACTIONAL:
+ return MongoDBState.nonTransactional(objectName, keyFields, valueField, options);
+ case OPAQUE:
+ return MongoDBState.opaque(objectName, keyFields, valueField, options);
+ case TRANSACTIONAL:
+ return MongoDBState.transactional(objectName, keyFields, valueField, options);
+ default:
+ throw new RuntimeException("Unknown state type: " + stateType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateAdapter.java
new file mode 100644
index 0000000..986878b
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateAdapter.java
@@ -0,0 +1,142 @@
+package com.jwplayer.sqe.language.state.redis;
+
+import com.jwplayer.sqe.trident.state.GsonOpaqueSerializer;
+import com.jwplayer.sqe.trident.state.GsonTransactionalSerializer;
+import com.jwplayer.sqe.language.state.StateAdapter;
+import com.jwplayer.sqe.language.state.StateOperationType;
+import com.jwplayer.sqe.trident.state.redis.RedisKeyFactory;
+
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import org.apache.storm.redis.trident.state.Options;
+import org.apache.storm.redis.trident.state.RedisDataTypeDescription;
+import org.apache.storm.redis.trident.state.RedisMapState;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class RedisStateAdapter extends StateAdapter {
+ protected RedisStateOptions options;
+
+ public RedisStateAdapter(RedisStateOptions options) { this.options = options; }
+
+ @Override
+ public TridentState partitionPersist(Stream stream, StateFactory stateFactory, Fields keyFields) {
+ RedisStateFactory factory = (RedisStateFactory) stateFactory;
+ RedisStateUpdater updater = new RedisStateUpdater(new StoreMapper(this.options,
+ factory.objectName, factory.streamFields));
+ updater.setExpireInterval(options.expireintervalsec);
+ return stream.partitionPersist(stateFactory, keyFields, updater);
+ }
+
+ private static class StoreMapper implements RedisStoreMapper {
+ private static final long serialVersionUID = 1L;
+ private RedisKeyFactory keyStringFactory;
+ private RedisKeyFactory valueStringFactory;
+ private org.apache.storm.redis.common.mapper.RedisDataTypeDescription description;
+
+ public StoreMapper(RedisStateOptions options, String objectName, List<String> streamFields) {
+ // No additional key since not aggregating
+ description = new org.apache.storm.redis.common.mapper.RedisDataTypeDescription(
+ org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.SET,
+ objectName);
+
+ List<Integer> keyNameFieldIndexes = new ArrayList<>();
+ List<Integer> fieldNameFieldIndexes = new ArrayList<>();
+
+ for(int i = 0; i < streamFields.size(); i++) {
+ if(options.keyNameFields.contains(streamFields.get(i))) keyNameFieldIndexes.add(i);
+ if(options.fieldNameFields.contains(streamFields.get(i))) fieldNameFieldIndexes.add(i);
+ }
+ keyStringFactory = new RedisKeyFactory(options.delimiter, keyNameFieldIndexes, "", "");
+ valueStringFactory = new RedisKeyFactory(options.delimiter, fieldNameFieldIndexes, "", "");
+ }
+
+ @Override
+ public org.apache.storm.redis.common.mapper.RedisDataTypeDescription getDataTypeDescription() {
+ return description;
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return keyStringFactory.build(tuple.getValues());
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return valueStringFactory.build(tuple.getValues());
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public StateFactory makeFactory(String objectName, List<String> keyFields, String valueField, StateType stateType, StateOperationType stateOperationType) {
+ JedisPoolConfig poolConfig =
+ new JedisPoolConfig.Builder()
+ .setDatabase(options.database)
+ .setHost(options.host)
+ .setPort(options.port)
+ .setTimeout(15000)
+ .build();
+ Options rOptions = new Options();
+
+ // Build the Redis options based on configuration from SQE
+ switch(options.dataType) {
+ case STRING:
+ List<Integer> indexes = new ArrayList<>();
+
+ for(int i = 0; i < keyFields.size(); i++) indexes.add(i);
+
+ rOptions.keyFactory = new RedisKeyFactory(options.delimiter, indexes, objectName, valueField);
+ rOptions.dataTypeDescription = new RedisDataTypeDescription(options.dataType);
+ break;
+ case HASH:
+ case SET:
+ List<Integer> keyNameFieldIndexes = new ArrayList<>();
+ List<Integer> fieldNameFieldIndexes = new ArrayList<>();
+
+ for(int i = 0; i < keyFields.size(); i++) {
+ if(options.keyNameFields.contains(keyFields.get(i))) keyNameFieldIndexes.add(i);
+ if(options.fieldNameFields.contains(keyFields.get(i))) fieldNameFieldIndexes.add(i);
+ }
+
+ rOptions.keyFactory = new RedisKeyFactory(options.delimiter, keyNameFieldIndexes, objectName, "");
+ rOptions.dataTypeDescription =
+ new RedisDataTypeDescription(options.dataType,
+ new RedisKeyFactory(options.delimiter, fieldNameFieldIndexes, "", valueField));
+ break;
+ case LIST:
+ case SORTED_SET:
+ case HYPER_LOG_LOG:
+ throw new IllegalArgumentException("Unsupported Redis data type: " + options.dataType);
+ }
+
+ // Set TTL for the keys
+ rOptions.expireIntervalSec = options.expireintervalsec;
+
+ if (stateOperationType == StateOperationType.NONAGGREGATE){
+ return new RedisStateFactory(poolConfig, objectName, keyFields);
+ }
+ // Return the appropriate state factory
+ switch(stateType) {
+ case NON_TRANSACTIONAL:
+ return RedisMapState.nonTransactional(poolConfig, rOptions);
+ case OPAQUE:
+ rOptions.serializer = new GsonOpaqueSerializer();
+ return RedisMapState.opaque(poolConfig, rOptions);
+ case TRANSACTIONAL:
+ rOptions.serializer = new GsonTransactionalSerializer();
+ return RedisMapState.transactional(poolConfig, rOptions);
+ default:
+ throw new RuntimeException("Unknown state type: " + stateType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateFactory.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateFactory.java
new file mode 100644
index 0000000..9faa980
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateFactory.java
@@ -0,0 +1,29 @@
+package com.jwplayer.sqe.language.state.redis;
+
+import java.util.List;
+
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.trident.state.RedisState;
+
+/**
+ * Similar to RedisState.Factory but also allows storing an objectName,
+ * used to name-space hash data in Redis
+ */
+public class RedisStateFactory extends RedisState.Factory{
+ private static final long serialVersionUID = 1L;
+ public List<String> streamFields;
+ public String objectName;
+
+
+ /**
+ * Store objectName for persisting state to Hash (additional key)
+ * @param config
+ * @param objectName
+ */
+ public RedisStateFactory(JedisPoolConfig config, String objectName, List<String> streamFields) {
+ super(config);
+ this.objectName = objectName;
+ this.streamFields = streamFields;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateOptions.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateOptions.java
new file mode 100644
index 0000000..3699480
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/state/redis/RedisStateOptions.java
@@ -0,0 +1,43 @@
+package com.jwplayer.sqe.language.state.redis;
+
+import org.apache.storm.redis.trident.state.RedisDataTypeDescription.RedisDataType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class RedisStateOptions implements Serializable {
+ public int database = 0;
+ public RedisDataType dataType = RedisDataType.STRING;
+ public String delimiter = ":";
+ public List<String> fieldNameFields = new ArrayList<>();
+ public String host = "";
+ public List<String> keyNameFields = new ArrayList<>();
+ public int port = 6379;
+ public int expireintervalsec = 0;
+
+ @SuppressWarnings("unchecked")
+ public static RedisStateOptions parse(Map map) {
+ RedisStateOptions options = new RedisStateOptions();
+
+ if(map.containsKey("jw.sqe.state.redis.database"))
+ options.database = ((Number) map.get("jw.sqe.state.redis.database")).intValue();
+ if(map.containsKey("jw.sqe.state.redis.datatype"))
+ options.dataType = RedisDataType.valueOf((String) map.get("jw.sqe.state.redis.datatype"));
+ if(map.containsKey("jw.sqe.state.redis.delimiter"))
+ options.delimiter = (String) map.get("jw.sqe.state.redis.delimiter");
+ if(map.containsKey("jw.sqe.state.redis.expireintervalsec"))
+ options.expireintervalsec = ((Number)map.get("jw.sqe.state.redis.expireintervalsec")).intValue();
+ if(map.containsKey("jw.sqe.state.redis.fieldname.fields"))
+ options.fieldNameFields = (List<String>) map.get("jw.sqe.state.redis.fieldname.fields");
+ if(map.containsKey("jw.sqe.state.redis.host"))
+ options.host = (String) map.get("jw.sqe.state.redis.host");
+ if(map.containsKey("jw.sqe.state.redis.keyname.fields"))
+ options.keyNameFields = (List<String>) map.get("jw.sqe.state.redis.keyname.fields");
+ if(map.containsKey("jw.sqe.state.redis.port"))
+ options.port = Integer.parseInt((String) map.get("jw.sqe.state.redis.port"));
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/InputStreamType.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/InputStreamType.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/InputStreamType.java
new file mode 100644
index 0000000..7bebbd6
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/InputStreamType.java
@@ -0,0 +1,6 @@
+package com.jwplayer.sqe.language.stream;
+
+public enum InputStreamType {
+ BinaryAvro,
+ Tuple
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamAdapter.java
new file mode 100644
index 0000000..f43c35a
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamAdapter.java
@@ -0,0 +1,36 @@
+package com.jwplayer.sqe.language.stream;
+
+import com.clearspring.analytics.hash.MurmurHash;
+import com.jwplayer.sqe.language.stream.kafka.KafkaStreamAdapter;
+import com.jwplayer.sqe.language.stream.kafka.KafkaStreamAdapterOptions;
+import com.jwplayer.sqe.language.stream.testing.FixedBatchSpoutOptions;
+import com.jwplayer.sqe.language.stream.testing.FixedBatchSpoutStreamAdapter;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateType;
+
+import java.util.Map;
+
+
+public abstract class StreamAdapter {
+ public abstract Stream makeStream(TridentTopology topology, String topologyName, String streamName, String objectName, StateType spoutType);
+
+ public static final String STREAM_METADATA_FIELD = "_stream_metadata";
+
+ public static Long createPid(String topologyName, String streamName, String secondaryID) {
+ return MurmurHash.hash64(topologyName + streamName + secondaryID);
+ }
+
+ public static StreamAdapter makeAdapter(String spoutName, Map options) {
+ switch(spoutName.toLowerCase()) {
+ case "fixed":
+ FixedBatchSpoutOptions fixedOptions = FixedBatchSpoutOptions.parse(options);
+ return new FixedBatchSpoutStreamAdapter(fixedOptions);
+ case "kafka":
+ KafkaStreamAdapterOptions kafkaOptions = KafkaStreamAdapterOptions.parse(options);
+ return new KafkaStreamAdapter(kafkaOptions);
+ default:
+ throw new RuntimeException("Unknown spout name: " + spoutName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamContainer.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamContainer.java
new file mode 100644
index 0000000..8f15ac1
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/StreamContainer.java
@@ -0,0 +1,15 @@
+package com.jwplayer.sqe.language.stream;
+
+import org.apache.storm.trident.Stream;
+
+public class StreamContainer {
+ public InputStreamType inputStreamType;
+ public Stream stream;
+ public String streamName;
+
+ public StreamContainer(InputStreamType inputStreamType, Stream stream, String streamName) {
+ this.inputStreamType = inputStreamType;
+ this.stream = stream;
+ this.streamName = streamName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapter.java
new file mode 100644
index 0000000..e3e5c91
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapter.java
@@ -0,0 +1,52 @@
+package com.jwplayer.sqe.language.stream.kafka;
+
+import com.jwplayer.sqe.language.stream.StreamAdapter;
+import com.jwplayer.sqe.trident.spout.kafka.FilteredOpaqueTridentKafkaSpout;
+import com.jwplayer.sqe.trident.spout.kafka.FilteredTransactionalTridentKafkaSpout;
+import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
+import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateType;
+
+
+public class KafkaStreamAdapter extends StreamAdapter {
+ KafkaStreamAdapterOptions options;
+
+ public KafkaStreamAdapter(KafkaStreamAdapterOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ public Stream makeStream(TridentTopology topology, String topologyName, String streamName, String objectName, StateType spoutType) {
+ String txID = topologyName + "/" + streamName;
+
+ if (options.filterReplays) {
+ switch (spoutType) {
+ case TRANSACTIONAL:
+ FilteredTransactionalTridentKafkaSpout tSpout =
+ new FilteredTransactionalTridentKafkaSpout(options.getKafkaConfig(topologyName, streamName, objectName), options.filterReplaysMetadataTtl);
+ return topology.newStream(txID, tSpout);
+ case OPAQUE:
+ FilteredOpaqueTridentKafkaSpout oSpout =
+ new FilteredOpaqueTridentKafkaSpout(options.getKafkaConfig(topologyName, streamName, objectName), options.filterReplaysMetadataTtl);
+ return topology.newStream(txID, oSpout);
+ default:
+ throw new RuntimeException(spoutType.toString() + " is not a supported state type");
+ }
+ } else {
+ switch (spoutType) {
+ case TRANSACTIONAL:
+ TransactionalTridentKafkaSpout tSpout =
+ new TransactionalTridentKafkaSpout(options.getKafkaConfig(topologyName, streamName, objectName));
+ return topology.newStream(txID, tSpout);
+ case OPAQUE:
+ OpaqueTridentKafkaSpout oSpout =
+ new OpaqueTridentKafkaSpout(options.getKafkaConfig(topologyName, streamName, objectName));
+ return topology.newStream(txID, oSpout);
+ default:
+ throw new RuntimeException(spoutType.toString() + " is not a supported state type");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapterOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapterOptions.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapterOptions.java
new file mode 100644
index 0000000..2c70053
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/kafka/KafkaStreamAdapterOptions.java
@@ -0,0 +1,54 @@
+package com.jwplayer.sqe.language.stream.kafka;
+
+import com.google.common.base.Joiner;
+import com.jwplayer.sqe.trident.spout.kafka.SqeRawFullScheme;
+import org.apache.storm.kafka.FullSchemeAsMultiScheme;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.kafka.trident.TridentKafkaConfig;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+
+public class KafkaStreamAdapterOptions implements Serializable {
+ Integer bufferSizeBytes = null;
+ String clientID;
+ Integer fetchSizeBytes = null;
+ Boolean filterReplays = false;
+ Long filterReplaysMetadataTtl = 60L * 60L * 24L * 2L;
+ Long maxOffsetBehind = null;
+ ZkHosts zkHosts;
+
+ public TridentKafkaConfig getKafkaConfig(String topologyName, String streamName, String topic) {
+ TridentKafkaConfig config = new TridentKafkaConfig(zkHosts, topic, clientID);
+ if(bufferSizeBytes != null) config.bufferSizeBytes = bufferSizeBytes;
+ if(fetchSizeBytes != null) config.fetchSizeBytes = fetchSizeBytes;
+ if(maxOffsetBehind != null) config.maxOffsetBehind = maxOffsetBehind;
+ config.scheme = new FullSchemeAsMultiScheme(
+ new SqeRawFullScheme(topologyName, streamName, zkHosts)
+ );
+
+ return config;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static KafkaStreamAdapterOptions parse(Map map) {
+ KafkaStreamAdapterOptions options = new KafkaStreamAdapterOptions();
+ options.zkHosts = new ZkHosts(Joiner.on(',').join((List<String>) map.get("jw.sqe.spout.kafka.zkhosts")));
+ options.clientID = (String) map.get("jw.sqe.spout.kafka.clientid");
+
+ if(map.containsKey("jw.sqe.spout.kafka.bufferSizeBytes"))
+ options.bufferSizeBytes = (int) map.get("jw.sqe.spout.kafka.bufferSizeBytes");
+ if(map.containsKey("jw.sqe.spout.kafka.fetchSizeBytes"))
+ options.fetchSizeBytes = (int) map.get("jw.sqe.spout.kafka.fetchSizeBytes");
+ if(map.containsKey("jw.sqe.spout.kafka.filterReplays"))
+ options.filterReplays = (boolean) map.get("jw.sqe.spout.kafka.filterReplays");
+ if(map.containsKey("jw.sqe.spout.kafka.filterReplays.metadata.ttl"))
+ options.filterReplaysMetadataTtl = (long) map.get("jw.sqe.spout.kafka.filterReplays.metadata.ttl");
+ if(map.containsKey("jw.sqe.spout.kafka.maxOffsetBehind"))
+ options.maxOffsetBehind = (long) map.get("jw.sqe.spout.kafka.maxOffsetBehind");
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutOptions.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutOptions.java
new file mode 100644
index 0000000..defb4db
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutOptions.java
@@ -0,0 +1,33 @@
+package com.jwplayer.sqe.language.stream.testing;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+public class FixedBatchSpoutOptions implements Serializable {
+ public Fields fields;
+ public List<List<Object>> values;
+
+ @SuppressWarnings("unchecked")
+ public static FixedBatchSpoutOptions parse(Map map) {
+ FixedBatchSpoutOptions options = new FixedBatchSpoutOptions();
+
+ if(map.containsKey("jw.sqe.spout.fixed.fields")) options.fields =
+ new Fields((List) map.get("jw.sqe.spout.fixed.fields"));
+ options.values = new LinkedList<>();
+ if(map.containsKey("jw.sqe.spout.fixed.values")) {
+ List<List<Object>> rawValues = (List) map.get("jw.sqe.spout.fixed.values");
+
+ for(List<Object> rawValue: rawValues) {
+ options.values.add(new Values(rawValue.toArray()));
+ }
+ }
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutStreamAdapter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutStreamAdapter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutStreamAdapter.java
new file mode 100644
index 0000000..310763e
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/language/stream/testing/FixedBatchSpoutStreamAdapter.java
@@ -0,0 +1,33 @@
+package com.jwplayer.sqe.language.stream.testing;
+
+import com.jwplayer.sqe.language.stream.StreamAdapter;
+import com.jwplayer.sqe.trident.StreamMetadata;
+import com.jwplayer.sqe.trident.function.AddField;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
+
+public class FixedBatchSpoutStreamAdapter extends StreamAdapter {
+ FixedBatchSpoutOptions options;
+
+ public FixedBatchSpoutStreamAdapter(FixedBatchSpoutOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Stream makeStream(TridentTopology topology, String topologyName, String streamName, String objectName, StateType spoutType) {
+ List<Object>[] values = (List<Object>[]) options.values.toArray(new List[options.values.size()]);
+ FixedBatchSpout spout = new FixedBatchSpout(options.fields, options.values.size(), values);
+ Stream stream = topology.newStream(topologyName + "/" + streamName, spout);
+ long pid = StreamAdapter.createPid(topologyName, streamName, "");
+ StreamMetadata metadata = new StreamMetadata(pid, 0, 0L);
+ stream = stream.each(new Fields(), new AddField(metadata), new Fields(StreamAdapter.STREAM_METADATA_FIELD));
+ return stream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ListValuesCollector.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ListValuesCollector.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ListValuesCollector.java
new file mode 100644
index 0000000..eb638a3
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ListValuesCollector.java
@@ -0,0 +1,19 @@
+package com.jwplayer.sqe.trident;
+
+import org.apache.storm.trident.operation.TridentCollector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class ListValuesCollector implements TridentCollector {
+ public List<List<Object>> values = new ArrayList<>();
+
+ public void emit(List<Object> values) {
+ this.values.add(values);
+ }
+
+ public void reportError(Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/SingleValuesCollector.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/SingleValuesCollector.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/SingleValuesCollector.java
new file mode 100644
index 0000000..99b6ad3
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/SingleValuesCollector.java
@@ -0,0 +1,18 @@
+package com.jwplayer.sqe.trident;
+
+import org.apache.storm.trident.operation.TridentCollector;
+
+import java.util.List;
+
+
+public class SingleValuesCollector implements TridentCollector {
+ public List<Object> values = null;
+
+ public void emit(List<Object> values) {
+ this.values = values;
+ }
+
+ public void reportError(Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/StreamMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/StreamMetadata.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/StreamMetadata.java
new file mode 100644
index 0000000..913ea63
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/StreamMetadata.java
@@ -0,0 +1,43 @@
+package com.jwplayer.sqe.trident;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import org.apache.storm.shade.org.apache.commons.lang.ArrayUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+
+public class StreamMetadata implements Serializable {
+ public long pid;
+ public int partition;
+ public long offset;
+
+ public StreamMetadata(long pid, int partition, long offset) {
+ this.pid = pid;
+ this.partition = partition;
+ this.offset = offset;
+ }
+
+ public String getPidAndPartitionAsHex() {
+ return Long.toHexString(pid) + "-" + Integer.toHexString(partition);
+ }
+
+ public byte[] toBytes() {
+ return ArrayUtils.addAll(
+ Longs.toByteArray(pid),
+ ArrayUtils.addAll(Ints.toByteArray(partition), Longs.toByteArray(offset))
+ );
+ }
+
+ public static StreamMetadata parseBytes(byte[] bytes) {
+ Preconditions.checkArgument(bytes.length == 20, "Stream metadata bytes representation must contain exactly 20 bytes");
+
+ long pid = Longs.fromByteArray(Arrays.copyOfRange(bytes, 0, 8));
+ int partition = Ints.fromByteArray(Arrays.copyOfRange(bytes, 8, 12));
+ long offset = Longs.fromByteArray(Arrays.copyOfRange(bytes, 12, 20));
+
+ return new StreamMetadata(pid, partition, offset);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ValueFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ValueFilter.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ValueFilter.java
new file mode 100644
index 0000000..fad4651
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/ValueFilter.java
@@ -0,0 +1,11 @@
+package com.jwplayer.sqe.trident;
+
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class ValueFilter extends BaseFilter {
+ @Override
+ public boolean isKeep(TridentTuple tuple) {
+ return tuple.getBoolean(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorAggregator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorAggregator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorAggregator.java
new file mode 100644
index 0000000..e106aa4
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorAggregator.java
@@ -0,0 +1,30 @@
+package com.jwplayer.sqe.trident.aggregator;
+
+import org.apache.storm.tuple.Values;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.IOException;
+
+
+/* This aggregator is for creating new cardinality estimators and efficiently adding fields you want
+ * to count. Once the object is created, you can use the associated CombinerAggregator for combing multiple
+ * objects into a single bitmap. */
+public abstract class CardinalityEstimatorAggregator extends BaseAggregator<ICardinality> {
+ @Override
+ public void aggregate(ICardinality cardinalityEstimator, TridentTuple tuple, TridentCollector collector) {
+ // Replicate SQL COUNT(DISTINCT) functionality to not count NULLs as a countable value
+ if(tuple.getValue(0) != null) cardinalityEstimator.offer(tuple.getValue(0));
+ }
+
+ @Override
+ public void complete(ICardinality cardinalityEstimator, TridentCollector collector) {
+ try {
+ collector.emit(new Values(cardinalityEstimator.getBytes()));
+ } catch(IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/3654b8ca/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorCombinerAggregator.java
----------------------------------------------------------------------
diff --git a/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorCombinerAggregator.java b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorCombinerAggregator.java
new file mode 100644
index 0000000..32da432
--- /dev/null
+++ b/external/storm-sqe/src/main/java/com/jwplayer/sqe/trident/aggregator/CardinalityEstimatorCombinerAggregator.java
@@ -0,0 +1,13 @@
+package com.jwplayer.sqe.trident.aggregator;
+
+import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+
+/* This aggregator is for taking existing cardinality estimator bitmaps/objects and aggregating/combining them. */
+public abstract class CardinalityEstimatorCombinerAggregator implements CombinerAggregator<byte[]> {
+ @Override
+ public byte[] init(TridentTuple tuple) {
+ return (byte[]) tuple.getValue(0);
+ }
+}