You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2017/06/06 03:38:41 UTC
[21/51] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-121]Support
message filtering based on SQL92 closes apache/incubator-rocketmq#82
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java
----------------------------------------------------------------------
diff --git a/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java
new file mode 100644
index 0000000..ef81b29
--- /dev/null
+++ b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.filter.util.BitsArray;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BitsArrayTest {
+
+ BitsArray gen(int bitCount) {
+ BitsArray bitsArray = BitsArray.create(bitCount);
+
+ for (int i = 0; i < bitCount / Byte.SIZE; i++) {
+ bitsArray.setByte(i, (byte) (new Random(System.currentTimeMillis())).nextInt(0xff));
+ try {
+ Thread.sleep(2);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ return bitsArray;
+ }
+
+ int bitLength = Byte.SIZE;
+
+ @Test
+ public void testConstructor() {
+ BitsArray bitsArray = BitsArray.create(8);
+
+ assertThat(bitsArray.byteLength() == 1 && bitsArray.bitLength() == 8).isTrue();
+
+ bitsArray = BitsArray.create(9);
+
+ assertThat(bitsArray.byteLength() == 2 && bitsArray.bitLength() == 9).isTrue();
+
+ bitsArray = BitsArray.create(7);
+
+ assertThat(bitsArray.byteLength() == 1 && bitsArray.bitLength() == 7).isTrue();
+ }
+
+ @Test
+ public void testSet() {
+ BitsArray bitsArray = gen(bitLength);
+ BitsArray backUp = bitsArray.clone();
+
+ boolean val = bitsArray.getBit(2);
+
+ bitsArray.setBit(2, !val);
+
+ bitsArray.xor(backUp);
+
+ assertThat(bitsArray.getBit(2)).isTrue();
+ }
+
+ @Test
+ public void testAndOr() {
+ BitsArray bitsArray = gen(bitLength);
+
+ boolean val = bitsArray.getBit(2);
+
+ if (val) {
+ bitsArray.and(2, false);
+ assertThat(!bitsArray.getBit(2)).isTrue();
+ } else {
+ bitsArray.or(2, true);
+ assertThat(bitsArray.getBit(2)).isTrue();
+ }
+ }
+
+ @Test
+ public void testXor() {
+ BitsArray bitsArray = gen(bitLength);
+
+ boolean val = bitsArray.getBit(2);
+
+ bitsArray.xor(2, !val);
+
+ assertThat(bitsArray.getBit(2)).isTrue();
+ }
+
+ @Test
+ public void testNot() {
+ BitsArray bitsArray = gen(bitLength);
+ BitsArray backUp = bitsArray.clone();
+
+ bitsArray.not(2);
+
+ bitsArray.xor(backUp);
+
+ assertThat(bitsArray.getBit(2)).isTrue();
+ }
+
+ @Test
+ public void testOr() {
+ BitsArray b1 = BitsArray.create(new byte[]{(byte) 0xff, 0x00});
+ BitsArray b2 = BitsArray.create(new byte[]{0x00, (byte) 0xff});
+
+ b1.or(b2);
+
+ for (int i = 0; i < b1.bitLength(); i++) {
+ assertThat(b1.getBit(i)).isTrue();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java b/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java
new file mode 100644
index 0000000..c6097ee
--- /dev/null
+++ b/filter/src/test/java/org/apache/rocketmq/filter/BloomFilterTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.filter.util.BloomFilter;
+import org.apache.rocketmq.filter.util.BloomFilterData;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BloomFilterTest {
+
+ @Test
+ public void testEquals() {
+ BloomFilter a = BloomFilter.createByFn(10, 20);
+
+ BloomFilter b = BloomFilter.createByFn(10, 20);
+
+ BloomFilter c = BloomFilter.createByFn(12, 20);
+
+ BloomFilter d = BloomFilter.createByFn(10, 30);
+
+ assertThat(a).isEqualTo(b);
+ assertThat(a).isNotEqualTo(c);
+ assertThat(a).isNotEqualTo(d);
+ assertThat(d).isNotEqualTo(c);
+
+ assertThat(a.hashCode()).isEqualTo(b.hashCode());
+ assertThat(a.hashCode()).isNotEqualTo(c.hashCode());
+ assertThat(a.hashCode()).isNotEqualTo(d.hashCode());
+ assertThat(c.hashCode()).isNotEqualTo(d.hashCode());
+ }
+
+ @Test
+ public void testHashTo() {
+ String cid = "CID_abc_efg";
+
+ BloomFilter bloomFilter = BloomFilter.createByFn(10, 20);
+
+ BitsArray bits = BitsArray.create(bloomFilter.getM());
+
+ int[] bitPos = bloomFilter.calcBitPositions(cid);
+
+ bloomFilter.hashTo(cid, bits);
+
+ for (int bit : bitPos) {
+ assertThat(bits.getBit(bit)).isTrue();
+ }
+ }
+
+ @Test
+ public void testCalcBitPositions() {
+ String cid = "CID_abc_efg";
+
+ BloomFilter bloomFilter = BloomFilter.createByFn(10, 20);
+
+ int[] bitPos = bloomFilter.calcBitPositions(cid);
+
+ assertThat(bitPos).isNotNull();
+ assertThat(bitPos.length).isEqualTo(bloomFilter.getK());
+
+ int[] bitPos2 = bloomFilter.calcBitPositions(cid);
+
+ assertThat(bitPos2).isNotNull();
+ assertThat(bitPos2.length).isEqualTo(bloomFilter.getK());
+
+ assertThat(bitPos).isEqualTo(bitPos2);
+ }
+
+ @Test
+ public void testIsHit() {
+ String cid = "CID_abc_efg";
+ String cid2 = "CID_abc_123";
+
+ BloomFilter bloomFilter = BloomFilter.createByFn(10, 20);
+
+ BitsArray bits = BitsArray.create(bloomFilter.getM());
+
+ bloomFilter.hashTo(cid, bits);
+
+ assertThat(bloomFilter.isHit(cid, bits)).isTrue();
+ assertThat(!bloomFilter.isHit(cid2, bits)).isTrue();
+
+ bloomFilter.hashTo(cid2, bits);
+
+ assertThat(bloomFilter.isHit(cid, bits)).isTrue();
+ assertThat(bloomFilter.isHit(cid2, bits)).isTrue();
+ }
+
+ @Test
+ public void testBloomFilterData() {
+ BloomFilterData bloomFilterData = new BloomFilterData(new int[]{1, 2, 3}, 128);
+ BloomFilterData bloomFilterData1 = new BloomFilterData(new int[]{1, 2, 3}, 128);
+ BloomFilterData bloomFilterData2 = new BloomFilterData(new int[]{1, 2, 3}, 129);
+
+ assertThat(bloomFilterData).isEqualTo(bloomFilterData1);
+ assertThat(bloomFilterData2).isNotEqualTo(bloomFilterData);
+ assertThat(bloomFilterData2).isNotEqualTo(bloomFilterData1);
+
+ assertThat(bloomFilterData.hashCode()).isEqualTo(bloomFilterData1.hashCode());
+ assertThat(bloomFilterData2.hashCode()).isNotEqualTo(bloomFilterData.hashCode());
+ assertThat(bloomFilterData2.hashCode()).isNotEqualTo(bloomFilterData1.hashCode());
+
+ assertThat(bloomFilterData.getBitPos()).isEqualTo(bloomFilterData2.getBitPos());
+ assertThat(bloomFilterData.getBitNum()).isEqualTo(bloomFilterData1.getBitNum());
+ assertThat(bloomFilterData.getBitNum()).isNotEqualTo(bloomFilterData2.getBitNum());
+
+ bloomFilterData2.setBitNum(128);
+
+ assertThat(bloomFilterData).isEqualTo(bloomFilterData2);
+
+ bloomFilterData2.setBitPos(new int[]{1, 2, 3, 4});
+
+ assertThat(bloomFilterData).isNotEqualTo(bloomFilterData2);
+
+ BloomFilterData nullData = new BloomFilterData();
+
+ assertThat(nullData.getBitNum()).isEqualTo(0);
+ assertThat(nullData.getBitPos()).isNull();
+
+ BloomFilter bloomFilter = BloomFilter.createByFn(1, 300);
+
+ assertThat(bloomFilter).isNotNull();
+ assertThat(bloomFilter.isValid(bloomFilterData)).isFalse();
+ }
+
+ @Test
+ public void testCheckFalseHit() {
+ BloomFilter bloomFilter = BloomFilter.createByFn(1, 300);
+ BitsArray bits = BitsArray.create(bloomFilter.getM());
+ int falseHit = 0;
+ for (int i = 0; i < bloomFilter.getN(); i++) {
+ String str = randomString((new Random(System.nanoTime())).nextInt(127) + 10);
+ int[] bitPos = bloomFilter.calcBitPositions(str);
+
+ if (bloomFilter.checkFalseHit(bitPos, bits)) {
+ falseHit++;
+ }
+
+ bloomFilter.hashTo(bitPos, bits);
+ }
+
+ assertThat(falseHit).isLessThanOrEqualTo(bloomFilter.getF() * bloomFilter.getN() / 100);
+ }
+
+ private String randomString(int length) {
+ StringBuilder stringBuilder = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ stringBuilder.append((char) ((new Random(System.nanoTime())).nextInt(123 - 97) + 97));
+ }
+
+ return stringBuilder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java
----------------------------------------------------------------------
diff --git a/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java b/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java
new file mode 100644
index 0000000..0ee81c9
--- /dev/null
+++ b/filter/src/test/java/org/apache/rocketmq/filter/ExpressionTest.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.filter.expression.ComparisonExpression;
+import org.apache.rocketmq.filter.expression.ConstantExpression;
+import org.apache.rocketmq.filter.expression.EvaluationContext;
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.expression.MQFilterException;
+import org.apache.rocketmq.filter.expression.PropertyExpression;
+import org.apache.rocketmq.filter.parser.SelectorParser;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ExpressionTest {
+
+ private static String andExpression = "a=3 and b<>4 And c>5 AND d<=4";
+ private static String orExpression = "a=3 or b<>4 Or c>5 OR d<=4";
+ private static String inExpression = "a in ('3', '4', '5')";
+ private static String notInExpression = "a not in ('3', '4', '5')";
+ private static String betweenExpression = "a between 2 and 10";
+ private static String notBetweenExpression = "a not between 2 and 10";
+ private static String isNullExpression = "a is null";
+ private static String isNotNullExpression = "a is not null";
+ private static String equalExpression = "a is not null and a='hello'";
+ private static String booleanExpression = "a=TRUE OR b=FALSE";
+ private static String nullOrExpression = "a is null OR a='hello'";
+ private static String stringHasString = "TAGS is not null and TAGS='''''tag'''''";
+
+ @Test
+ public void testEvaluate_stringHasString() {
+ Expression expr = genExp(stringHasString);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("TAGS", "''tag''")
+ );
+
+ eval(expr, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_now() {
+ EvaluationContext context = genContext(
+ KeyValue.c("a", System.currentTimeMillis())
+ );
+
+ Expression nowExpression = ConstantExpression.createNow();
+ Expression propertyExpression = new PropertyExpression("a");
+
+ Expression expression = ComparisonExpression.createLessThanEqual(propertyExpression,
+ nowExpression);
+
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_stringCompare() {
+ Expression expression = genExp("a between up and low");
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "3.14")
+ );
+
+ eval(expression, context, Boolean.FALSE);
+
+ {
+ context = genContext(
+ KeyValue.c("a", "3.14"),
+ KeyValue.c("up", "up"),
+ KeyValue.c("low", "low")
+ );
+
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ {
+ expression = genExp("key is not null and key between 0 and 100");
+
+ context = genContext(
+ KeyValue.c("key", "con")
+ );
+
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ {
+ expression = genExp("a between 0 and 100");
+
+ context = genContext(
+ KeyValue.c("a", "abc")
+ );
+
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ {
+ expression = genExp("a=b");
+
+ context = genContext(
+ KeyValue.c("a", "3.14"),
+ KeyValue.c("b", "3.14")
+ );
+
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ {
+ expression = genExp("a<>b");
+
+ context = genContext(
+ KeyValue.c("a", "3.14"),
+ KeyValue.c("b", "3.14")
+ );
+
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ {
+ expression = genExp("a<>b");
+
+ context = genContext(
+ KeyValue.c("a", "3.14"),
+ KeyValue.c("b", "3.141")
+ );
+
+ eval(expression, context, Boolean.TRUE);
+ }
+ }
+
+ @Test
+ public void testEvaluate_exponent() {
+ Expression expression = genExp("a > 3.1E10");
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", String.valueOf(3.1415 * Math.pow(10, 10)))
+ );
+
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_floatNumber() {
+ Expression expression = genExp("a > 3.14");
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", String.valueOf(3.1415))
+ );
+
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_twoVariable() {
+ Expression expression = genExp("a > b");
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", String.valueOf(10)),
+ KeyValue.c("b", String.valueOf(20))
+ );
+
+ eval(expression, context, Boolean.FALSE);
+
+ context = genContext(
+ KeyValue.c("b", String.valueOf(10)),
+ KeyValue.c("a", String.valueOf(20))
+ );
+
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_nullOr() {
+ Expression expression = genExp(nullOrExpression);
+
+ EvaluationContext context = genContext(
+ );
+
+ eval(expression, context, Boolean.TRUE);
+
+ context = genContext(
+ KeyValue.c("a", "hello")
+ );
+
+ eval(expression, context, Boolean.TRUE);
+
+ context = genContext(
+ KeyValue.c("a", "abc")
+ );
+
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ @Test
+ public void testEvaluate_boolean() {
+ Expression expression = genExp(booleanExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "true"),
+ KeyValue.c("b", "false")
+ );
+
+ eval(expression, context, Boolean.TRUE);
+
+ context = genContext(
+ KeyValue.c("a", "false"),
+ KeyValue.c("b", "true")
+ );
+
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ @Test
+ public void testEvaluate_equal() {
+ Expression expression = genExp(equalExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "hello")
+ );
+
+ eval(expression, context, Boolean.TRUE);
+
+ context = genContext(
+ );
+
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ @Test
+ public void testEvaluate_andTrue() {
+ Expression expression = genExp(andExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", 3),
+ KeyValue.c("b", 5),
+ KeyValue.c("c", 6),
+ KeyValue.c("d", 1)
+ );
+
+ for (int i = 0; i < 500; i++) {
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ long start = System.currentTimeMillis();
+ for (int j = 0; j < 100; j++) {
+ for (int i = 0; i < 1000; i++) {
+ eval(expression, context, Boolean.TRUE);
+ }
+ }
+
+ // use string
+ context = genContext(
+ KeyValue.c("a", "3"),
+ KeyValue.c("b", "5"),
+ KeyValue.c("c", "6"),
+ KeyValue.c("d", "1")
+ );
+
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_andFalse() {
+ Expression expression = genExp(andExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", 4),
+ KeyValue.c("b", 5),
+ KeyValue.c("c", 6),
+ KeyValue.c("d", 1)
+ );
+
+ eval(expression, context, Boolean.FALSE);
+
+ // use string
+ context = genContext(
+ KeyValue.c("a", "4"),
+ KeyValue.c("b", "5"),
+ KeyValue.c("c", "6"),
+ KeyValue.c("d", "1")
+ );
+
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ @Test
+ public void testEvaluate_orTrue() {
+ Expression expression = genExp(orExpression);
+
+ // first
+ EvaluationContext context = genContext(
+ KeyValue.c("a", 3)
+ );
+ eval(expression, context, Boolean.TRUE);
+
+ // second
+ context = genContext(
+ KeyValue.c("a", 4),
+ KeyValue.c("b", 5)
+ );
+ eval(expression, context, Boolean.TRUE);
+
+ // third
+ context = genContext(
+ KeyValue.c("a", 4),
+ KeyValue.c("b", 4),
+ KeyValue.c("c", 6)
+ );
+ eval(expression, context, Boolean.TRUE);
+
+ // forth
+ context = genContext(
+ KeyValue.c("a", 4),
+ KeyValue.c("b", 4),
+ KeyValue.c("c", 3),
+ KeyValue.c("d", 2)
+ );
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_orFalse() {
+ Expression expression = genExp(orExpression);
+ // forth
+ EvaluationContext context = genContext(
+ KeyValue.c("a", 4),
+ KeyValue.c("b", 4),
+ KeyValue.c("c", 3),
+ KeyValue.c("d", 10)
+ );
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ @Test
+ public void testEvaluate_inTrue() {
+ Expression expression = genExp(inExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "3")
+ );
+ eval(expression, context, Boolean.TRUE);
+
+ context = genContext(
+ KeyValue.c("a", "4")
+ );
+ eval(expression, context, Boolean.TRUE);
+
+ context = genContext(
+ KeyValue.c("a", "5")
+ );
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_inFalse() {
+ Expression expression = genExp(inExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "8")
+ );
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ @Test
+ public void testEvaluate_notInTrue() {
+ Expression expression = genExp(notInExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "8")
+ );
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_notInFalse() {
+ Expression expression = genExp(notInExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "3")
+ );
+ eval(expression, context, Boolean.FALSE);
+
+ context = genContext(
+ KeyValue.c("a", "4")
+ );
+ eval(expression, context, Boolean.FALSE);
+
+ context = genContext(
+ KeyValue.c("a", "5")
+ );
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ @Test
+ public void testEvaluate_betweenTrue() {
+ Expression expression = genExp(betweenExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "2")
+ );
+ eval(expression, context, Boolean.TRUE);
+
+ context = genContext(
+ KeyValue.c("a", "10")
+ );
+ eval(expression, context, Boolean.TRUE);
+
+ context = genContext(
+ KeyValue.c("a", "3")
+ );
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_betweenFalse() {
+ Expression expression = genExp(betweenExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "1")
+ );
+ eval(expression, context, Boolean.FALSE);
+
+ context = genContext(
+ KeyValue.c("a", "11")
+ );
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ @Test
+ public void testEvaluate_notBetweenTrue() {
+ Expression expression = genExp(notBetweenExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "1")
+ );
+ eval(expression, context, Boolean.TRUE);
+
+ context = genContext(
+ KeyValue.c("a", "11")
+ );
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_notBetweenFalse() {
+ Expression expression = genExp(notBetweenExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "2")
+ );
+ eval(expression, context, Boolean.FALSE);
+
+ context = genContext(
+ KeyValue.c("a", "10")
+ );
+ eval(expression, context, Boolean.FALSE);
+
+ context = genContext(
+ KeyValue.c("a", "3")
+ );
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ @Test
+ public void testEvaluate_isNullTrue() {
+ Expression expression = genExp(isNullExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("abc", "2")
+ );
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_isNullFalse() {
+ Expression expression = genExp(isNullExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "2")
+ );
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ @Test
+ public void testEvaluate_isNotNullTrue() {
+ Expression expression = genExp(isNotNullExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("a", "2")
+ );
+ eval(expression, context, Boolean.TRUE);
+ }
+
+ @Test
+ public void testEvaluate_isNotNullFalse() {
+ Expression expression = genExp(isNotNullExpression);
+
+ EvaluationContext context = genContext(
+ KeyValue.c("abc", "2")
+ );
+ eval(expression, context, Boolean.FALSE);
+ }
+
+ protected void eval(Expression expression, EvaluationContext context, Boolean result) {
+ Object ret = null;
+ try {
+ ret = expression.evaluate(context);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+
+ if (ret == null || !(ret instanceof Boolean)) {
+ assertThat(result).isFalse();
+ } else {
+ assertThat(result).isEqualTo(ret);
+ }
+ }
+
+ protected EvaluationContext genContext(KeyValue... keyValues) {
+ if (keyValues == null || keyValues.length < 1) {
+ return new PropertyContext();
+ }
+
+ PropertyContext context = new PropertyContext();
+ for (KeyValue keyValue : keyValues) {
+ context.properties.put(keyValue.key, keyValue.value);
+ }
+
+ return context;
+ }
+
+ protected Expression genExp(String exp) {
+ Expression expression = null;
+
+ try {
+ expression = SelectorParser.parse(exp);
+
+ assertThat(expression).isNotNull();
+ } catch (MQFilterException e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+
+ return expression;
+ }
+
+ static class KeyValue {
+ public static KeyValue c(String key, Object value) {
+ return new KeyValue(key, value);
+ }
+
+ public KeyValue(String key, Object value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public String key;
+ public Object value;
+ }
+
+ class PropertyContext implements EvaluationContext {
+
+ public Map<String, Object> properties = new HashMap<String, Object>(8);
+
+ @Override
+ public Object get(final String name) {
+ return properties.get(name);
+ }
+
+ @Override
+ public Map<String, Object> keyValues() {
+ return properties;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java
----------------------------------------------------------------------
diff --git a/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java
new file mode 100644
index 0000000..22eeb86
--- /dev/null
+++ b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.filter.expression.EmptyEvaluationContext;
+import org.apache.rocketmq.filter.expression.EvaluationContext;
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.expression.MQFilterException;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class FilterSpiTest {
+
+ static class NothingExpression implements Expression {
+
+ @Override
+ public Object evaluate(final EvaluationContext context) throws Exception {
+ return Boolean.TRUE;
+ }
+ }
+
+ static class NothingFilter implements FilterSpi {
+ @Override
+ public Expression compile(final String expr) throws MQFilterException {
+ return new NothingExpression();
+ }
+
+ @Override
+ public String ofType() {
+ return "Nothing";
+ }
+ }
+
+
+ @Test
+ public void testRegister() {
+ FilterFactory.INSTANCE.register(new NothingFilter());
+
+ Expression expr = null;
+ try {
+ expr = FilterFactory.INSTANCE.get("Nothing").compile("abc");
+ } catch (MQFilterException e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+
+ assertThat(expr).isNotNull();
+
+ try {
+ assertThat((Boolean) expr.evaluate(new EmptyEvaluationContext())).isTrue();
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+
+ @Test
+ public void testGet() {
+ try {
+ assertThat((Boolean) FilterFactory.INSTANCE.get(ExpressionType.SQL92).compile("a is not null and a > 0")
+ .evaluate(new EmptyEvaluationContext())).isFalse();
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java
----------------------------------------------------------------------
diff --git a/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java b/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java
new file mode 100644
index 0000000..36ef271
--- /dev/null
+++ b/filter/src/test/java/org/apache/rocketmq/filter/ParserTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.expression.MQFilterException;
+import org.apache.rocketmq.filter.parser.SelectorParser;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ParserTest {
+
+ private static String andExpression = "a=3 and b<>4 And c>5 AND d<=4";
+ private static String andExpressionHasBlank = "a=3 and b<>4 And c>5 AND d<=4";
+ private static String orExpression = "a=3 or b<>4 Or c>5 OR d<=4";
+ private static String inExpression = "a in ('3', '4', '5')";
+ private static String notInExpression = "(a not in ('6', '4', '5')) or (b in ('3', '4', '5'))";
+ private static String betweenExpression = "(a between 2 and 10) AND (b not between 6 and 9)";
+ private static String equalNullExpression = "a is null";
+ private static String notEqualNullExpression = "a is not null";
+ private static String nowExpression = "a <= now";
+
+ private static String invalidExpression = "a and between 2 and 10";
+ private static String illegalBetween = " a between 10 and 0";
+
+ @Test
+ public void testParse_valid() {
+ for (String expr : Arrays.asList(
+ andExpression, orExpression, inExpression, notInExpression, betweenExpression,
+ equalNullExpression, notEqualNullExpression, nowExpression
+ )) {
+
+ try {
+ Expression expression = SelectorParser.parse(expr);
+ assertThat(expression).isNotNull();
+ } catch (MQFilterException e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+
+ }
+ }
+
+ @Test
+ public void testParse_invalid() {
+ try {
+ SelectorParser.parse(invalidExpression);
+
+ assertThat(Boolean.TRUE).isFalse();
+ } catch (MQFilterException e) {
+ }
+ }
+
+ @Test
+ public void testParse_decimalOverFlow() {
+ try {
+ String str = "100000000000000000000000";
+
+ SelectorParser.parse("a > " + str);
+
+ assertThat(Boolean.TRUE).isFalse();
+ } catch (Exception e) {
+ }
+ }
+
+ @Test
+ public void testParse_floatOverFlow() {
+ try {
+ String str = "1";
+ for (int i = 0; i < 2048; i++) {
+ str += "111111111111111111111111111111111111111111111111111";
+ }
+ str += ".";
+ for (int i = 0; i < 2048; i++) {
+ str += "111111111111111111111111111111111111111111111111111";
+ }
+
+ SelectorParser.parse("a > " + str);
+
+ assertThat(Boolean.TRUE).isFalse();
+ } catch (Exception e) {
+ }
+ }
+
+ @Test
+ public void testParse_illegalBetween() {
+ try {
+ SelectorParser.parse(illegalBetween);
+
+ assertThat(Boolean.TRUE).isFalse();
+ } catch (Exception e) {
+ }
+ }
+
+ @Test
+ public void testEquals() {
+ try {
+ Expression expr1 = SelectorParser.parse(andExpression);
+
+ Expression expr2 = SelectorParser.parse(andExpressionHasBlank);
+
+ Expression expr3 = SelectorParser.parse(orExpression);
+
+ assertThat(expr1).isEqualTo(expr2);
+ assertThat(expr1).isNotEqualTo(expr3);
+ } catch (MQFilterException e) {
+ e.printStackTrace();
+ assertThat(Boolean.TRUE).isFalse();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 47df84d..feb8b14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -178,6 +178,7 @@
<module>example</module>
<module>filtersrv</module>
<module>srvutil</module>
+ <module>filter</module>
<module>test</module>
<module>distribution</module>
</modules>
@@ -554,6 +555,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-filter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-example</artifactId>
<version>${project.version}</version>
@@ -603,6 +609,11 @@
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>19.0</version>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/srvutil/pom.xml
----------------------------------------------------------------------
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index 3269903..6dc0377 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -41,5 +41,9 @@
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 5be8258..7841feb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -314,10 +314,11 @@ public class CommitLog {
// 17 properties
short propertiesLength = byteBuffer.getShort();
+ Map<String, String> propertiesMap = null;
if (propertiesLength > 0) {
byteBuffer.get(bytesContent, 0, propertiesLength);
String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
- Map<String, String> propertiesMap = MessageDecoder.string2messageProperties(properties);
+ propertiesMap = MessageDecoder.string2messageProperties(properties);
keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
@@ -369,8 +370,9 @@ public class CommitLog {
queueOffset, // 7
keys, // 8
uniqKey, //9
- sysFlag, // 9
- preparedTransactionOffset// 10
+ sysFlag, // 10
+ preparedTransactionOffset, // 11
+ propertiesMap // 12
);
} catch (Exception e) {
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java b/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java
new file mode 100644
index 0000000..e1564a9
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLogDispatcher.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+/**
+ * Dispatcher of commit log.
+ */
+public interface CommitLogDispatcher {
+
+ void dispatch(final DispatchRequest request);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 919c637..d03ff0f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -20,6 +20,7 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ public class ConsumeQueue {
private final int mappedFileSize;
private long maxPhysicOffset = -1;
private volatile long minLogicOffset = 0;
+ private ConsumeQueueExt consumeQueueExt = null;
public ConsumeQueue(
final String topic,
@@ -61,11 +63,24 @@ public class ConsumeQueue {
this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
+
+ if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
+ this.consumeQueueExt = new ConsumeQueueExt(
+ topic,
+ queueId,
+ StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
+ defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
+ defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
+ );
+ }
}
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
+ if (isExtReadEnable()) {
+ result &= this.consumeQueueExt.load();
+ }
return result;
}
@@ -82,6 +97,7 @@ public class ConsumeQueue {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
+ long maxExtAddr = 1;
while (true) {
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();
@@ -91,6 +107,9 @@ public class ConsumeQueue {
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
this.maxPhysicOffset = offset;
+ if (isExtAddr(tagsCode)) {
+ maxExtAddr = tagsCode;
+ }
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
@@ -123,6 +142,12 @@ public class ConsumeQueue {
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
+
+ if (isExtReadEnable()) {
+ this.consumeQueueExt.recover();
+ log.info("Truncate consume queue extend file by max {}", maxExtAddr);
+ this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
+ }
}
}
@@ -200,7 +225,7 @@ public class ConsumeQueue {
int logicFileSize = this.mappedFileSize;
this.maxPhysicOffset = phyOffet - 1;
-
+ long maxExtAddr = 1;
while (true) {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile != null) {
@@ -213,7 +238,7 @@ public class ConsumeQueue {
for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
- byteBuffer.getLong();
+ long tagsCode = byteBuffer.getLong();
if (0 == i) {
if (offset >= phyOffet) {
@@ -225,6 +250,10 @@ public class ConsumeQueue {
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;
+ // This maybe not take effect, when not every consume queue has extend file.
+ if (isExtAddr(tagsCode)) {
+ maxExtAddr = tagsCode;
+ }
}
} else {
@@ -239,6 +268,9 @@ public class ConsumeQueue {
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;
+ if (isExtAddr(tagsCode)) {
+ maxExtAddr = tagsCode;
+ }
if (pos == logicFileSize) {
return;
@@ -252,6 +284,10 @@ public class ConsumeQueue {
break;
}
}
+
+ if (isExtReadEnable()) {
+ this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
+ }
}
public long getLastOffset() {
@@ -285,7 +321,12 @@ public class ConsumeQueue {
}
public boolean flush(final int flushLeastPages) {
- return this.mappedFileQueue.flush(flushLeastPages);
+ boolean result = this.mappedFileQueue.flush(flushLeastPages);
+ if (isExtReadEnable()) {
+ result = result & this.consumeQueueExt.flush(flushLeastPages);
+ }
+
+ return result;
}
public int deleteExpiredFile(long offset) {
@@ -296,6 +337,7 @@ public class ConsumeQueue {
public void correctMinOffset(long phyMinOffset) {
MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
+ long minExtAddr = 1;
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);
if (result != null) {
@@ -303,12 +345,16 @@ public class ConsumeQueue {
for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = result.getByteBuffer().getLong();
result.getByteBuffer().getInt();
- result.getByteBuffer().getLong();
+ long tagsCode = result.getByteBuffer().getLong();
if (offsetPy >= phyMinOffset) {
this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
log.info("Compute logical min offset: {}, topic: {}, queueId: {}",
this.getMinOffsetInQueue(), this.topic, this.queueId);
+ // This maybe not take effect, when not every consume queue has extend file.
+ if (isExtAddr(tagsCode)) {
+ minExtAddr = tagsCode;
+ }
break;
}
}
@@ -319,24 +365,43 @@ public class ConsumeQueue {
}
}
}
+
+ if (isExtReadEnable()) {
+ this.consumeQueueExt.truncateByMinAddress(minExtAddr);
+ }
}
public long getMinOffsetInQueue() {
return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
}
- public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
- long logicOffset) {
+ public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
- boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
+ long tagsCode = request.getTagsCode();
+ if (isExtWriteEnable()) {
+ ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+ cqExtUnit.setFilterBitMap(request.getBitMap());
+ cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
+ cqExtUnit.setTagsCode(request.getTagsCode());
+
+ long extAddr = this.consumeQueueExt.put(cqExtUnit);
+ if (isExtAddr(extAddr)) {
+ tagsCode = extAddr;
+ } else {
+ log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
+ topic, queueId, request.getCommitLogOffset());
+ }
+ }
+ boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
+ request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
- this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
+ this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
- log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
+ log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
@@ -423,6 +488,20 @@ public class ConsumeQueue {
return null;
}
+ public ConsumeQueueExt.CqExtUnit getExt(final long offset) {
+ if (isExtReadEnable()) {
+ return this.consumeQueueExt.get(offset);
+ }
+ return null;
+ }
+
+ public boolean getExt(final long offset, ConsumeQueueExt.CqExtUnit cqExtUnit) {
+ if (isExtReadEnable()) {
+ return this.consumeQueueExt.get(offset, cqExtUnit);
+ }
+ return false;
+ }
+
public long getMinLogicOffset() {
return minLogicOffset;
}
@@ -457,6 +536,9 @@ public class ConsumeQueue {
this.maxPhysicOffset = -1;
this.minLogicOffset = 0;
this.mappedFileQueue.destroy();
+ if (isExtReadEnable()) {
+ this.consumeQueueExt.destroy();
+ }
}
public long getMessageTotalInQueue() {
@@ -469,5 +551,27 @@ public class ConsumeQueue {
public void checkSelf() {
mappedFileQueue.checkSelf();
+ if (isExtReadEnable()) {
+ this.consumeQueueExt.checkSelf();
+ }
+ }
+
+ protected boolean isExtReadEnable() {
+ return this.consumeQueueExt != null;
+ }
+
+ protected boolean isExtWriteEnable() {
+ return this.consumeQueueExt != null
+ && this.defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt();
+ }
+
+ /**
+ * Check {@code tagsCode} is address of extend file or tags code.
+ *
+ * @param tagsCode
+ * @return
+ */
+ public boolean isExtAddr(long tagsCode) {
+ return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
new file mode 100644
index 0000000..1a177e9
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Extend of consume queue, to store something not important,
+ * such as message store time, filter bit map and etc.
+ * <p/>
+ * <li>1. This class is used only by {@link ConsumeQueue}</li>
+ * <li>2. And is week reliable.</li>
+ * <li>3. Be careful, address returned is always less than 0.</li>
+ * <li>4. Pls keep this file small.</li>
+ */
+public class ConsumeQueueExt {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+ private final MappedFileQueue mappedFileQueue;
+ private final String topic;
+ private final int queueId;
+
+ private final String storePath;
+ private final int mappedFileSize;
+ private ByteBuffer tempContainer;
+
+ public static final int END_BLANK_DATA_LENGTH = 4;
+
+ /**
+ * Addr can not exceed this value.For compatible.
+ */
+ public static final long MAX_ADDR = Integer.MIN_VALUE - 1L;
+ public static final long MAX_REAL_OFFSET = MAX_ADDR - Long.MIN_VALUE;
+
+ /**
+ * Constructor.
+ *
+ * @param topic topic
+ * @param queueId id of queue
+ * @param storePath root dir of files to store.
+ * @param mappedFileSize file size
+ * @param bitMapLength bit map length.
+ */
+ public ConsumeQueueExt(final String topic,
+ final int queueId,
+ final String storePath,
+ final int mappedFileSize,
+ final int bitMapLength) {
+
+ this.storePath = storePath;
+ this.mappedFileSize = mappedFileSize;
+
+ this.topic = topic;
+ this.queueId = queueId;
+
+ String queueDir = this.storePath
+ + File.separator + topic
+ + File.separator + queueId;
+
+ this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
+
+ if (bitMapLength > 0) {
+ this.tempContainer = ByteBuffer.allocate(
+ bitMapLength / Byte.SIZE
+ );
+ }
+ }
+
+ /**
+ * Check whether {@code address} point to extend file.
+ * <p>
+ * Just test {@code address} is less than 0.
+ * </p>
+ *
+ * @param address
+ * @return
+ */
+ public boolean isExtAddr(final long address) {
+ return address <= MAX_ADDR;
+ }
+
+ /**
+ * Transform {@code address}(decorated by {@link #decorate}) to offset in mapped file.
+ * <p>
+ * if {@code address} is less than 0, return {@code address} - {@link java.lang.Long#MIN_VALUE};
+ * else, just return {@code address}
+ * </p>
+ *
+ * @param address
+ * @return
+ */
+ public long unDecorate(final long address) {
+ if (isExtAddr(address)) {
+ return address - Long.MIN_VALUE;
+ }
+ return address;
+ }
+
+ /**
+ * Decorate {@code offset} from mapped file, in order to distinguish with tagsCode(saved in cq originally).
+ * <p>
+ * if {@code offset} is greater than or equal to 0, then return {@code offset} + {@link java.lang.Long#MIN_VALUE};
+ * else, just return {@code offset}
+ * </p>
+ *
+ * @param offset
+ * @return ext address(value is less than 0)
+ */
+ public long decorate(final long offset) {
+ if (!isExtAddr(offset)) {
+ return offset + Long.MIN_VALUE;
+ }
+ return offset;
+ }
+
+ /**
+ * Get data from buffer.
+ *
+ * @param address less than 0
+ * @return
+ */
+ public CqExtUnit get(final long address) {
+ CqExtUnit cqExtUnit = new CqExtUnit();
+ if (get(address, cqExtUnit)) {
+ return cqExtUnit;
+ }
+
+ return null;
+ }
+
+ /**
+ * Get data from buffer, and set to {@code cqExtUnit}
+ *
+ * @param address less than 0
+ * @param cqExtUnit
+ * @return
+ */
+ public boolean get(final long address, final CqExtUnit cqExtUnit) {
+ if (!isExtAddr(address)) {
+ return false;
+ }
+
+ final int mappedFileSize = this.mappedFileSize;
+ final long realOffset = unDecorate(address);
+
+ MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(realOffset, realOffset == 0);
+ if (mappedFile == null) {
+ return false;
+ }
+
+ int pos = (int) (realOffset % mappedFileSize);
+
+ SelectMappedBufferResult bufferResult = mappedFile.selectMappedBuffer(pos);
+ if (bufferResult == null) {
+ log.warn("[BUG] Consume queue extend unit({}) is not found!", realOffset);
+ return false;
+ }
+ boolean ret = false;
+ try {
+ ret = cqExtUnit.read(bufferResult.getByteBuffer());
+ } finally {
+ bufferResult.release();
+ }
+
+ return ret;
+ }
+
+ /**
+ * Save to mapped buffer of file and return address.
+ * <p>
+ * Be careful, this method is not thread safe.
+ * </p>
+ *
+ * @param cqExtUnit
+ * @return success: < 0: fail: >=0
+ */
+ public long put(final CqExtUnit cqExtUnit) {
+ final int retryTimes = 3;
+ try {
+ int size = cqExtUnit.calcUnitSize();
+ if (size > CqExtUnit.MAX_EXT_UNIT_SIZE) {
+ log.error("Size of cq ext unit is greater than {}, {}", CqExtUnit.MAX_EXT_UNIT_SIZE, cqExtUnit);
+ return 1;
+ }
+ if (this.mappedFileQueue.getMaxOffset() + size > MAX_REAL_OFFSET) {
+ log.warn("Capacity of ext is maximum!{}, {}", this.mappedFileQueue.getMaxOffset(), size);
+ return 1;
+ }
+ // unit size maybe change.but, the same most of the time.
+ if (this.tempContainer == null || this.tempContainer.capacity() < size) {
+ this.tempContainer = ByteBuffer.allocate(size);
+ }
+
+ for (int i = 0; i < retryTimes; i++) {
+ MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+
+ if (mappedFile == null || mappedFile.isFull()) {
+ mappedFile = this.mappedFileQueue.getLastMappedFile(0);
+ }
+
+ if (mappedFile == null) {
+ log.error("Create mapped file when save consume queue extend, {}", cqExtUnit);
+ continue;
+ }
+ final int wrotePosition = mappedFile.getWrotePosition();
+ final int blankSize = this.mappedFileSize - wrotePosition - END_BLANK_DATA_LENGTH;
+
+ // check whether has enough space.
+ if (size > blankSize) {
+ fullFillToEnd(mappedFile, wrotePosition);
+ log.info("No enough space(need:{}, has:{}) of file {}, so fill to end",
+ size, blankSize, mappedFile.getFileName());
+ continue;
+ }
+
+ if (mappedFile.appendMessage(cqExtUnit.write(this.tempContainer), 0, size)) {
+ return decorate(wrotePosition + mappedFile.getFileFromOffset());
+ }
+ }
+ } catch (Throwable e) {
+ log.error("Save consume queue extend error, " + cqExtUnit, e);
+ }
+
+ return 1;
+ }
+
+ protected void fullFillToEnd(final MappedFile mappedFile, final int wrotePosition) {
+ ByteBuffer mappedFileBuffer = mappedFile.sliceByteBuffer();
+ mappedFileBuffer.position(wrotePosition);
+
+ // ending.
+ mappedFileBuffer.putShort((short) -1);
+
+ mappedFile.setWrotePosition(this.mappedFileSize);
+ }
+
+ /**
+ * Load data from file when startup.
+ *
+ * @return
+ */
+ public boolean load() {
+ boolean result = this.mappedFileQueue.load();
+ log.info("load consume queue extend" + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
+ return result;
+ }
+
+ /**
+ * Check whether the step size in mapped file queue is correct.
+ */
+ public void checkSelf() {
+ this.mappedFileQueue.checkSelf();
+ }
+
+ /**
+ * Recover.
+ */
+ public void recover() {
+ final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+ if (mappedFiles == null || mappedFiles.isEmpty()) {
+ return;
+ }
+
+ // load all files, consume queue will truncate extend files.
+ int index = 0;
+
+ MappedFile mappedFile = mappedFiles.get(index);
+ ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+ long processOffset = mappedFile.getFileFromOffset();
+ long mappedFileOffset = 0;
+ CqExtUnit extUnit = new CqExtUnit();
+ while (true) {
+ extUnit.readBySkip(byteBuffer);
+
+ // check whether write sth.
+ if (extUnit.getSize() > 0) {
+ mappedFileOffset += extUnit.getSize();
+ continue;
+ }
+
+ index++;
+ if (index < mappedFiles.size()) {
+ mappedFile = mappedFiles.get(index);
+ byteBuffer = mappedFile.sliceByteBuffer();
+ processOffset = mappedFile.getFileFromOffset();
+ mappedFileOffset = 0;
+ log.info("Recover next consume queue extend file, " + mappedFile.getFileName());
+ continue;
+ }
+
+ log.info("All files of consume queue extend has been recovered over, last mapped file "
+ + mappedFile.getFileName());
+ break;
+ }
+
+ processOffset += mappedFileOffset;
+ this.mappedFileQueue.setFlushedWhere(processOffset);
+ this.mappedFileQueue.setCommittedWhere(processOffset);
+ this.mappedFileQueue.truncateDirtyFiles(processOffset);
+ }
+
+ /**
+ * Delete files before {@code minAddress}.
+ *
+ * @param minAddress less than 0
+ */
+ public void truncateByMinAddress(final long minAddress) {
+ if (!isExtAddr(minAddress)) {
+ return;
+ }
+
+ log.info("Truncate consume queue ext by min {}.", minAddress);
+
+ List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
+
+ List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+ final long realOffset = unDecorate(minAddress);
+
+ for (MappedFile file : mappedFiles) {
+ long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
+
+ if (fileTailOffset < realOffset) {
+ log.info("Destroy consume queue ext by min: file={}, fileTailOffset={}, minOffset={}", file.getFileName(),
+ fileTailOffset, realOffset);
+ if (file.destroy(1000)) {
+ willRemoveFiles.add(file);
+ }
+ }
+ }
+
+ this.mappedFileQueue.deleteExpiredFile(willRemoveFiles);
+ }
+
+ /**
+ * Delete files after {@code maxAddress}, and reset wrote/commit/flush position to last file.
+ *
+ * @param maxAddress less than 0
+ */
+ public void truncateByMaxAddress(final long maxAddress) {
+ if (!isExtAddr(maxAddress)) {
+ return;
+ }
+
+ log.info("Truncate consume queue ext by max {}.", maxAddress);
+
+ CqExtUnit cqExtUnit = get(maxAddress);
+ if (cqExtUnit == null) {
+ log.error("[BUG] address {} of consume queue extend not found!", maxAddress);
+ return;
+ }
+
+ final long realOffset = unDecorate(maxAddress);
+
+ this.mappedFileQueue.truncateDirtyFiles(realOffset + cqExtUnit.getSize());
+ }
+
+ /**
+ * flush buffer to file.
+ *
+ * @param flushLeastPages
+ * @return
+ */
+ public boolean flush(final int flushLeastPages) {
+ return this.mappedFileQueue.flush(flushLeastPages);
+ }
+
+ /**
+ * delete files and directory.
+ */
+ public void destroy() {
+ this.mappedFileQueue.destroy();
+ }
+
+ /**
+ * Max address(value is less than 0).
+ * <p/>
+ * <p>
+ * Be careful: it's an address just when invoking this method.
+ * </p>
+ *
+ * @return
+ */
+ public long getMaxAddress() {
+ MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+ if (mappedFile == null) {
+ return decorate(0);
+ }
+ return decorate(mappedFile.getFileFromOffset() + mappedFile.getWrotePosition());
+ }
+
+ /**
+ * Minus address saved in file.
+ *
+ * @return
+ */
+ public long getMinAddress() {
+ MappedFile firstFile = this.mappedFileQueue.getFirstMappedFile();
+ if (firstFile == null) {
+ return decorate(0);
+ }
+ return decorate(firstFile.getFileFromOffset());
+ }
+
+ /**
+ * Store unit.
+ */
+ public static class CqExtUnit {
+ public static final short MIN_EXT_UNIT_SIZE
+ = 2 * 1 // size, 32k max
+ + 8 * 2 // msg time + tagCode
+ + 2; // bitMapSize
+
+ public static final int MAX_EXT_UNIT_SIZE = Short.MAX_VALUE;
+
+ public CqExtUnit() {}
+
+ public CqExtUnit(Long tagsCode, long msgStoreTime, byte[] filterBitMap) {
+ this.tagsCode = tagsCode == null ? 0 : tagsCode;
+ this.msgStoreTime = msgStoreTime;
+ this.filterBitMap = filterBitMap;
+ this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
+ this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize);
+ }
+
+ /**
+ * unit size
+ */
+ private short size;
+ /**
+ * has code of tags
+ */
+ private long tagsCode;
+ /**
+ * the time to store into commit log of message
+ */
+ private long msgStoreTime;
+ /**
+ * size of bit map
+ */
+ private short bitMapSize;
+ /**
+ * filter bit map
+ */
+ private byte[] filterBitMap;
+
+ /**
+ * build unit from buffer from current position.
+ *
+ * @param buffer
+ * @return
+ */
+ private boolean read(final ByteBuffer buffer) {
+ if (buffer.position() + 2 > buffer.limit()) {
+ return false;
+ }
+
+ this.size = buffer.getShort();
+
+ if (this.size < 1) {
+ return false;
+ }
+
+ this.tagsCode = buffer.getLong();
+ this.msgStoreTime = buffer.getLong();
+ this.bitMapSize = buffer.getShort();
+
+ if (this.bitMapSize < 1) {
+ return true;
+ }
+
+ if (this.filterBitMap == null || this.filterBitMap.length != this.bitMapSize) {
+ this.filterBitMap = new byte[bitMapSize];
+ }
+
+ buffer.get(this.filterBitMap);
+ return true;
+ }
+
+ /**
+ * Only read first 2 byte to get unit size.
+ * <p>
+ * if size > 0, then skip buffer position with size.
+ * </p>
+ * <p>
+ * if size <= 0, nothing to do.
+ * </p>
+ *
+ * @param buffer
+ */
+ private void readBySkip(final ByteBuffer buffer) {
+ ByteBuffer temp = buffer.slice();
+
+ short tempSize = temp.getShort();
+ this.size = tempSize;
+
+ if (tempSize > 0) {
+ buffer.position(buffer.position() + this.size);
+ }
+ }
+
+ /**
+ * Transform unit data to byte array.
+ * <p/>
+ * <li>1. @{code container} can be null, it will be created if null.</li>
+ * <li>2. if capacity of @{code container} is less than unit size, it will be created also.</li>
+ * <li>3. Pls be sure that size of unit is not greater than {@link #MAX_EXT_UNIT_SIZE}</li>
+ *
+ * @param container
+ * @return
+ */
+ private byte[] write(final ByteBuffer container) {
+ this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
+ this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize);
+
+ ByteBuffer temp = container;
+
+ if (temp == null || temp.capacity() < this.size) {
+ temp = ByteBuffer.allocate(this.size);
+ }
+
+ temp.flip();
+ temp.limit(this.size);
+
+ temp.putShort(this.size);
+ temp.putLong(this.tagsCode);
+ temp.putLong(this.msgStoreTime);
+ temp.putShort(this.bitMapSize);
+ if (this.bitMapSize > 0) {
+ temp.put(this.filterBitMap);
+ }
+
+ return temp.array();
+ }
+
+ /**
+ * Calculate unit size by current data.
+ *
+ * @return
+ */
+ private int calcUnitSize() {
+ int sizeTemp = MIN_EXT_UNIT_SIZE + (filterBitMap == null ? 0 : filterBitMap.length);
+ return sizeTemp;
+ }
+
+ public long getTagsCode() {
+ return tagsCode;
+ }
+
+ public void setTagsCode(final long tagsCode) {
+ this.tagsCode = tagsCode;
+ }
+
+ public long getMsgStoreTime() {
+ return msgStoreTime;
+ }
+
+ public void setMsgStoreTime(final long msgStoreTime) {
+ this.msgStoreTime = msgStoreTime;
+ }
+
+ public byte[] getFilterBitMap() {
+ if (this.bitMapSize < 1) {
+ return null;
+ }
+ return filterBitMap;
+ }
+
+ public void setFilterBitMap(final byte[] filterBitMap) {
+ this.filterBitMap = filterBitMap;
+ // not safe transform, but size will be calculate by #calcUnitSize
+ this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
+ }
+
+ public short getSize() {
+ return size;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof CqExtUnit)) return false;
+
+ CqExtUnit cqExtUnit = (CqExtUnit) o;
+
+ if (bitMapSize != cqExtUnit.bitMapSize) return false;
+ if (msgStoreTime != cqExtUnit.msgStoreTime) return false;
+ if (size != cqExtUnit.size) return false;
+ if (tagsCode != cqExtUnit.tagsCode) return false;
+ if (!Arrays.equals(filterBitMap, cqExtUnit.filterBitMap)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) size;
+ result = 31 * result + (int) (tagsCode ^ (tagsCode >>> 32));
+ result = 31 * result + (int) (msgStoreTime ^ (msgStoreTime >>> 32));
+ result = 31 * result + (int) bitMapSize;
+ result = 31 * result + (filterBitMap != null ? Arrays.hashCode(filterBitMap) : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "CqExtUnit{" +
+ "size=" + size +
+ ", tagsCode=" + tagsCode +
+ ", msgStoreTime=" + msgStoreTime +
+ ", bitMapSize=" + bitMapSize +
+ ", filterBitMap=" + Arrays.toString(filterBitMap) +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
index 1350026..9db87f3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
@@ -18,26 +18,33 @@ package org.apache.rocketmq.store;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
public class DefaultMessageFilter implements MessageFilter {
- @Override
- public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
- if (tagsCode == null) {
- return true;
- }
+ private SubscriptionData subscriptionData;
- if (null == subscriptionData) {
- return true;
- }
+ public DefaultMessageFilter(final SubscriptionData subscriptionData) {
+ this.subscriptionData = subscriptionData;
+ }
- if (subscriptionData.isClassFilterMode())
+ @Override
+ public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
+ if (null == tagsCode || null == subscriptionData) {
return true;
+ }
- if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
+ if (subscriptionData.isClassFilterMode()) {
return true;
}
- return subscriptionData.getCodeSet().contains(tagsCode.intValue());
+ return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
+ || subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
+ @Override
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 0edfeec..7bed62c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -41,7 +42,6 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.store.config.BrokerRole;
@@ -60,8 +60,6 @@ import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
public class DefaultMessageStore implements MessageStore {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- private final MessageFilter messageFilter = new DefaultMessageFilter();
-
private final MessageStoreConfig messageStoreConfig;
// CommitLog
private final CommitLog commitLog;
@@ -103,6 +101,8 @@ public class DefaultMessageStore implements MessageStore {
private AtomicLong printTimes = new AtomicLong(0);
+ private final LinkedList<CommitLogDispatcher> dispatcherList;
+
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
@@ -133,6 +133,10 @@ public class DefaultMessageStore implements MessageStore {
this.allocateMappedFileService.start();
this.indexService.start();
+
+ this.dispatcherList = new LinkedList<>();
+ this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
+ this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
}
public void truncateDirtyLogicFiles(long phyOffset) {
@@ -409,7 +413,7 @@ public class DefaultMessageStore implements MessageStore {
}
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
- final SubscriptionData subscriptionData) {
+ final MessageFilter messageFilter) {
if (this.shutdown) {
log.warn("message store has shutdown, so getMessage is forbidden");
return null;
@@ -464,6 +468,7 @@ public class DefaultMessageStore implements MessageStore {
int i = 0;
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
+ ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
@@ -483,29 +488,51 @@ public class DefaultMessageStore implements MessageStore {
break;
}
- if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
- SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
- if (selectResult != null) {
- this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
- getResult.addMessage(selectResult);
- status = GetMessageStatus.FOUND;
- nextPhyFileStartOffset = Long.MIN_VALUE;
+ boolean extRet = false;
+ if (consumeQueue.isExtAddr(tagsCode)) {
+ extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
+ if (extRet) {
+ tagsCode = cqExtUnit.getTagsCode();
} else {
- if (getResult.getBufferTotalSize() == 0) {
- status = GetMessageStatus.MESSAGE_WAS_REMOVING;
- }
-
- nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
+ // can't find ext content.Client will filter messages by tag also.
+ log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
+ tagsCode, offsetPy, sizePy, topic, group);
}
- } else {
+ }
+
+ if (messageFilter != null
+ && !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
- if (log.isDebugEnabled()) {
- log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode);
+ continue;
+ }
+
+ SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
+ if (null == selectResult) {
+ if (getResult.getBufferTotalSize() == 0) {
+ status = GetMessageStatus.MESSAGE_WAS_REMOVING;
+ }
+
+ nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
+ continue;
+ }
+
+ if (messageFilter != null
+ && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
+ if (getResult.getBufferTotalSize() == 0) {
+ status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
+ // release...
+ selectResult.release();
+ continue;
}
+
+ this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
+ getResult.addMessage(selectResult);
+ status = GetMessageStatus.FOUND;
+ nextPhyFileStartOffset = Long.MIN_VALUE;
}
if (diskFallRecorded) {
@@ -1318,27 +1345,14 @@ public class DefaultMessageStore implements MessageStore {
}
public void doDispatch(DispatchRequest req) {
- final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
- switch (tranType) {
- case MessageSysFlag.TRANSACTION_NOT_TYPE:
- case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
- DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
- req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
- break;
- case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
- case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
- break;
- }
-
- if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
- DefaultMessageStore.this.indexService.buildIndex(req);
+ for (CommitLogDispatcher dispatcher : this.dispatcherList) {
+ dispatcher.dispatch(req);
}
}
- public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
- long logicOffset) {
- ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
- cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
+ public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
+ ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
+ cq.putMessagePositionInfoWrapper(dispatchRequest);
}
public BrokerStatsManager getBrokerStatsManager() {
@@ -1354,6 +1368,20 @@ public class DefaultMessageStore implements MessageStore {
return remainTransientStoreBufferNumbs() == 0;
}
+ @Override
+ public LinkedList<CommitLogDispatcher> getDispatcherList() {
+ return this.dispatcherList;
+ }
+
+ @Override
+ public ConsumeQueue getConsumeQueue(String topic, int queueId) {
+ ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
+ if (map == null) {
+ return null;
+ }
+ return map.get(queueId);
+ }
+
public void unlockMappedFile(final MappedFile mappedFile) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
@@ -1363,6 +1391,33 @@ public class DefaultMessageStore implements MessageStore {
}, 6, TimeUnit.SECONDS);
}
+ class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
+
+ @Override
+ public void dispatch(DispatchRequest request) {
+ final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
+ switch (tranType) {
+ case MessageSysFlag.TRANSACTION_NOT_TYPE:
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ DefaultMessageStore.this.putMessagePositionInfo(request);
+ break;
+ case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ break;
+ }
+ }
+ }
+
+ class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
+
+ @Override
+ public void dispatch(DispatchRequest request) {
+ if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
+ DefaultMessageStore.this.indexService.buildIndex(request);
+ }
+ }
+ }
+
class CleanCommitLogService {
private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
@@ -1695,7 +1750,8 @@ public class DefaultMessageStore implements MessageStore {
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
- dispatchRequest.getTagsCode());
+ dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+ dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
// FIXED BUG By shijia
this.reputFromOffset += size;