You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/21 10:34:09 UTC
kylin git commit: KYLIN-1919 performance enhancement
Repository: kylin
Updated Branches:
refs/heads/master 5156ccd47 -> 3142c74cb
KYLIN-1919 performance enhancement
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3142c74c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3142c74c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3142c74c
Branch: refs/heads/master
Commit: 3142c74cb22ea2b80ba36de462da69197020e1b3
Parents: 5156ccd
Author: shaofengshi <sh...@apache.org>
Authored: Fri Oct 21 18:30:44 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Oct 21 18:32:52 2016 +0800
----------------------------------------------------------------------
.../kylin/source/kafka/StreamingParser.java | 49 +++--
.../source/kafka/TimedJsonStreamParser.java | 31 ++--
.../test/java/TimedJsonStreamParserTest.java | 178 -------------------
.../source/kafka/TimedJsonStreamParserTest.java | 166 +++++++++++++++++
source-kafka/src/test/resources/message.json | 10 +-
5 files changed, 217 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3142c74c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 43b2ac5..75f9c4b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -21,7 +21,6 @@ package org.apache.kylin.source.kafka;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
@@ -31,7 +30,6 @@ import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.metadata.model.TblColRef;
-import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,15 +45,15 @@ public abstract class StreamingParser {
public static final String EMBEDDED_PROPERTY_SEPARATOR = "separator";
public static final Map<String, String> defaultProperties = Maps.newHashMap();
- public static final Set derivedTimeColumns = Sets.newHashSet();
+ public static final Map<String, Integer> derivedTimeColumns = Maps.newHashMap();
static {
- derivedTimeColumns.add("minute_start");
- derivedTimeColumns.add("hour_start");
- derivedTimeColumns.add("day_start");
- derivedTimeColumns.add("week_start");
- derivedTimeColumns.add("month_start");
- derivedTimeColumns.add("quarter_start");
- derivedTimeColumns.add("year_start");
+ derivedTimeColumns.put("minute_start", 1);
+ derivedTimeColumns.put("hour_start", 2);
+ derivedTimeColumns.put("day_start", 3);
+ derivedTimeColumns.put("week_start", 4);
+ derivedTimeColumns.put("month_start", 5);
+ derivedTimeColumns.put("quarter_start", 6);
+ derivedTimeColumns.put("year_start", 7);
defaultProperties.put(PROPERTY_TS_COLUMN_NAME, "timestamp");
defaultProperties.put(PROPERTY_TS_PARSER, "org.apache.kylin.source.kafka.DefaultTimeParser");
defaultProperties.put(PROPERTY_TS_PATTERN, DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
@@ -108,32 +106,45 @@ public abstract class StreamingParser {
* @return true if the columnName is a derived time column; otherwise false;
*/
public static final boolean populateDerivedTimeColumns(String columnName, List<String> result, long t) {
- if (derivedTimeColumns.contains(columnName) == false)
+
+ Integer derivedTimeColumn = derivedTimeColumns.get(columnName);
+ if (derivedTimeColumn == null) {
return false;
+ }
long normalized = 0;
- if (columnName.equals("minute_start")) {
+ switch (derivedTimeColumn) {
+ case 1:
normalized = TimeUtil.getMinuteStart(t);
result.add(DateFormat.formatToTimeWithoutMilliStr(normalized));
- } else if (columnName.equals("hour_start")) {
+ break;
+ case 2:
normalized = TimeUtil.getHourStart(t);
result.add(DateFormat.formatToTimeWithoutMilliStr(normalized));
- } else if (columnName.equals("day_start")) {
- //from day_start on, formatTs will output date format
+ break;
+ case 3:
normalized = TimeUtil.getDayStart(t);
result.add(DateFormat.formatToDateStr(normalized));
- } else if (columnName.equals("week_start")) {
+ break;
+ case 4:
normalized = TimeUtil.getWeekStart(t);
result.add(DateFormat.formatToDateStr(normalized));
- } else if (columnName.equals("month_start")) {
+ break;
+ case 5:
normalized = TimeUtil.getMonthStart(t);
result.add(DateFormat.formatToDateStr(normalized));
- } else if (columnName.equals("quarter_start")) {
+ break;
+ case 6:
normalized = TimeUtil.getQuarterStart(t);
result.add(DateFormat.formatToDateStr(normalized));
- } else if (columnName.equals("year_start")) {
+ break;
+ case 7:
normalized = TimeUtil.getYearStart(t);
result.add(DateFormat.formatToDateStr(normalized));
+ break;
+ default:
+ throw new IllegalStateException();
+
}
return true;
http://git-wip-us.apache.org/repos/asf/kylin/blob/3142c74c/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 633a30c..e00ce16 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -64,6 +64,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
private String tsColName = null;
private String tsParser = null;
private String separator = null;
+ private final Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ private final Map<String, Object> tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ private final Map<String, String[]> nameMap = new HashMap<>();
private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
@@ -100,15 +103,14 @@ public final class TimedJsonStreamParser extends StreamingParser {
public StreamingMessage parse(ByteBuffer buffer) {
try {
Map<String, Object> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
- Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ root.clear();
root.putAll(message);
String tsStr = objToString(root.get(tsColName));
long t = streamTimeParser.parseTime(tsStr);
ArrayList<String> result = Lists.newArrayList();
for (TblColRef column : allColumns) {
- String columnName = column.getName();
- columnName = columnName.toLowerCase();
+ final String columnName = column.getName().toLowerCase();
if (populateDerivedTimeColumns(columnName, result, t) == false) {
result.add(getValueByKey(columnName, root));
}
@@ -126,18 +128,24 @@ public final class TimedJsonStreamParser extends StreamingParser {
return true;
}
- protected String getValueByKey(String key, Map<String, Object> root) throws IOException {
- if (root.containsKey(key)) {
- return objToString(root.get(key));
+ protected String getValueByKey(String key, Map<String, Object> rootMap) throws IOException {
+ if (rootMap.containsKey(key)) {
+ return objToString(rootMap.get(key));
}
- if (key.contains(separator)) {
- String[] names = key.toLowerCase().split(separator);
- Map<String, Object> tempMap = root;
+ String[] names = nameMap.get(key);
+ if (names == null && key.contains(separator)) {
+ names = key.toLowerCase().split(separator);
+ nameMap.put(key, names);
+ }
+
+ if (names != null && names.length > 0) {
+ tempMap.clear();
+ tempMap.putAll(rootMap);
for (int i = 0; i < names.length - 1; i++) {
- Object o = root.get(names[i]);
+ Object o = tempMap.get(names[i]);
if (o instanceof Map) {
- tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ tempMap.clear();
tempMap.putAll((Map<String, Object>) o);
} else {
throw new IOException("Property '" + names[i] + "' is not embedded format");
@@ -145,7 +153,6 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
Object finalObject = tempMap.get(names[names.length - 1]);
return objToString(finalObject);
-
}
return StringUtils.EMPTY;
http://git-wip-us.apache.org/repos/asf/kylin/blob/3142c74c/source-kafka/src/test/java/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
deleted file mode 100644
index 5a52b61..0000000
--- a/source-kafka/src/test/java/TimedJsonStreamParserTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import com.fasterxml.jackson.databind.JavaType;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.HashMap;
-import java.util.ArrayList;
-
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-
-
-public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
-
- private static String[] userNeedColNames;
-
- private static final String jsonFilePath = "src/test/resources/message.json";
-
- private static ObjectMapper mapper;
-
- private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
- SimpleType.construct(Object.class));
-
-
- @BeforeClass
- public static void setUp() throws Exception {
- staticCreateTestMetadata();
- mapper = new ObjectMapper();
- }
-
- @AfterClass
- public static void after() throws Exception {
- cleanAfterClass();
- }
-
-
- @Test
- public void testNormalValue() throws Exception {
- userNeedColNames = new String[]{"createdAt", "id", "isTruncated", "text"};
- List<TblColRef> allCol = mockupTblColRefList();
- TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
- Object msg = mapper.readValue(new File(jsonFilePath), mapType);
- ByteBuffer buffer = getJsonByteBuffer(msg);
- StreamingMessage sMsg = parser.parse(buffer);
- List<String> result = sMsg.getData();
- assertEquals("Jul 20, 2016 9:59:17 AM", result.get(0));
- assertEquals("755703618762862600", result.get(1));
- assertEquals("false", result.get(2));
- assertEquals("dejamos las tapas regionales de este #Miercoles https://t.co/kfe0kT2Fup", result.get(3));
- }
-
- @Test
- public void testEmbeddedValue() throws Exception {
- userNeedColNames = new String[]{"user_id", "user_description", "user_isProtected"};
- List<TblColRef> allCol = mockupTblColRefList();
- TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
- Object msg = mapper.readValue(new File(jsonFilePath), mapType);
- ByteBuffer buffer = getJsonByteBuffer(msg);
- StreamingMessage sMsg = parser.parse(buffer);
- List<String> result = sMsg.getData();
- assertEquals("4853763947", result.get(0));
- assertEquals("Noticias, an\ufffd\ufffdlisis e informaci\ufffd\ufffdn para el crecimiento de la regi\ufffd\ufffdn.", result.get(1));
- assertEquals("false", result.get(2));
- }
-
- @Test
- public void testArrayValue() throws Exception {
- userNeedColNames = new String[]{"userMentionEntities", "mediaEntities"};
- List<TblColRef> allCol = mockupTblColRefList();
- TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
- Object msg = mapper.readValue(new File(jsonFilePath), mapType);
- HashMap<String, Object> map = (HashMap<String, Object>) msg;
- Object array = map.get("mediaEntities");
- ByteBuffer buffer = getJsonByteBuffer(msg);
- StreamingMessage sMsg = parser.parse(buffer);
- List<String> result = sMsg.getData();
- System.out.println(result);
-
- }
-
- @Test
- public void testMapValue() throws Exception {
- userNeedColNames = new String[]{"user"};
- List<TblColRef> allCol = mockupTblColRefList();
- TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
- Object msg = mapper.readValue(new File(jsonFilePath), mapType);
- ByteBuffer buffer = getJsonByteBuffer(msg);
- StreamingMessage sMsg = parser.parse(buffer);
- List<String> result = sMsg.getData();
- System.out.println("result:" + result);
-
- }
-
- @Test
- public void testNullKey() throws Exception {
- userNeedColNames = new String[]{"null", ""};
- List<TblColRef> allCol = mockupTblColRefList();
- TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
- Object msg = mapper.readValue(new File(jsonFilePath), mapType);
- ByteBuffer buffer = getJsonByteBuffer(msg);
- StreamingMessage sMsg = parser.parse(buffer);
- List<String> result = sMsg.getData();
- assertEquals(StringUtils.EMPTY, result.get(0));
- assertEquals(StringUtils.EMPTY, result.get(1));
- }
-
-
- private static ByteBuffer getJsonByteBuffer(Object obj) throws IOException {
- byte[] bytes = mapper.writeValueAsBytes(obj);
- ByteBuffer buff = ByteBuffer.wrap(bytes);
- buff.position(0);
- return buff;
- }
-
-
- private static List<TblColRef> mockupTblColRefList() {
- TableDesc t = mockupTableDesc("table_a");
- List<TblColRef> list = new ArrayList<>();
- for (int i = 0; i < userNeedColNames.length; i++) {
- ColumnDesc c = mockupColumnDesc(t, i, userNeedColNames[i], "string");
- list.add(c.getRef());
- }
- return list;
- }
-
- private static TableDesc mockupTableDesc(String tableName) {
- TableDesc mockup = new TableDesc();
- mockup.setName(tableName);
- return mockup;
- }
-
- private static ColumnDesc mockupColumnDesc(TableDesc table, int oneBasedColumnIndex, String name, String datatype) {
- ColumnDesc desc = new ColumnDesc();
- String id = "" + oneBasedColumnIndex;
- desc.setId(id);
- desc.setName(name);
- desc.setDatatype(datatype);
- desc.init(table);
- return desc;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3142c74c/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
new file mode 100644
index 0000000..f92a24e
--- /dev/null
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+
+public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
+
+ private static String[] userNeedColNames;
+ private static final String jsonFilePath = "src/test/resources/message.json";
+ private static ObjectMapper mapper;
+ private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ staticCreateTestMetadata();
+ mapper = new ObjectMapper();
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ cleanAfterClass();
+ }
+
+ @Test
+ public void testNormalValue() throws Exception {
+ userNeedColNames = new String[] { "createdAt", "id", "isTruncated", "text" };
+ List<TblColRef> allCol = mockupTblColRefList();
+ TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+ Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+ ByteBuffer buffer = getJsonByteBuffer(msg);
+ StreamingMessage sMsg = parser.parse(buffer);
+ List<String> result = sMsg.getData();
+ assertEquals("Jul 20, 2016 9:59:17 AM", result.get(0));
+ assertEquals("755703618762862600", result.get(1));
+ assertEquals("false", result.get(2));
+ assertEquals("dejamos", result.get(3));
+ }
+
+ @Test
+ public void testEmbeddedValue() throws Exception {
+ userNeedColNames = new String[] { "user_id", "user_description", "user_isProtected" };
+ List<TblColRef> allCol = mockupTblColRefList();
+ TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+ Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+ ByteBuffer buffer = getJsonByteBuffer(msg);
+ StreamingMessage sMsg = parser.parse(buffer);
+ List<String> result = sMsg.getData();
+ assertEquals("4853763947", result.get(0));
+ assertEquals("Noticias", result.get(1));
+ assertEquals("false", result.get(2));
+ }
+
+ @Test
+ public void testArrayValue() throws Exception {
+ userNeedColNames = new String[] { "userMentionEntities", "mediaEntities" };
+ List<TblColRef> allCol = mockupTblColRefList();
+ TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+ Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+ HashMap<String, Object> map = (HashMap<String, Object>) msg;
+ Object array = map.get("mediaEntities");
+ ByteBuffer buffer = getJsonByteBuffer(msg);
+ StreamingMessage sMsg = parser.parse(buffer);
+ List<String> result = sMsg.getData();
+ System.out.println(result);
+
+ }
+
+ @Test
+ public void testMapValue() throws Exception {
+ userNeedColNames = new String[] { "user" };
+ List<TblColRef> allCol = mockupTblColRefList();
+ TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+ Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+ ByteBuffer buffer = getJsonByteBuffer(msg);
+ StreamingMessage sMsg = parser.parse(buffer);
+ List<String> result = sMsg.getData();
+
+ }
+
+ @Test
+ public void testNullKey() throws Exception {
+ userNeedColNames = new String[] { "null", "" };
+ List<TblColRef> allCol = mockupTblColRefList();
+ TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+ Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+ ByteBuffer buffer = getJsonByteBuffer(msg);
+ StreamingMessage sMsg = parser.parse(buffer);
+ List<String> result = sMsg.getData();
+ assertEquals(StringUtils.EMPTY, result.get(0));
+ assertEquals(StringUtils.EMPTY, result.get(1));
+ }
+
+ private static ByteBuffer getJsonByteBuffer(Object obj) throws IOException {
+ byte[] bytes = mapper.writeValueAsBytes(obj);
+ ByteBuffer buff = ByteBuffer.wrap(bytes);
+ buff.position(0);
+ return buff;
+ }
+
+ private static List<TblColRef> mockupTblColRefList() {
+ TableDesc t = mockupTableDesc("table_a");
+ List<TblColRef> list = new ArrayList<>();
+ for (int i = 0; i < userNeedColNames.length; i++) {
+ ColumnDesc c = mockupColumnDesc(t, i, userNeedColNames[i], "string");
+ list.add(c.getRef());
+ }
+ return list;
+ }
+
+ private static TableDesc mockupTableDesc(String tableName) {
+ TableDesc mockup = new TableDesc();
+ mockup.setName(tableName);
+ return mockup;
+ }
+
+ private static ColumnDesc mockupColumnDesc(TableDesc table, int oneBasedColumnIndex, String name, String datatype) {
+ ColumnDesc desc = new ColumnDesc();
+ String id = "" + oneBasedColumnIndex;
+ desc.setId(id);
+ desc.setName(name);
+ desc.setDatatype(datatype);
+ desc.init(table);
+ return desc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3142c74c/source-kafka/src/test/resources/message.json
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/resources/message.json b/source-kafka/src/test/resources/message.json
index dfafd45..55f35d3 100644
--- a/source-kafka/src/test/resources/message.json
+++ b/source-kafka/src/test/resources/message.json
@@ -1,8 +1,7 @@
{
"createdAt": "Jul 20, 2016 9:59:17 AM",
"id": 755703618762862600,
- "text": "dejamos las tapas regionales de este #Miercoles https://t.co/kfe0kT2Fup",
- "source": "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
+ "text": "dejamos",
"isTruncated": false,
"inReplyToStatusId": -1,
"inReplyToUserId": -1,
@@ -15,8 +14,6 @@
"mediaEntities": [
{
"id": 755703584084328400,
- "url": "https://t.co/kfe0kT2Fup",
- "displayURL": "pic.twitter.com/kfe0kT2Fup",
"sizes": {
"0": {
"width": 150,
@@ -38,11 +35,8 @@
"currentUserRetweetId": -1,
"user": {
"id": 4853763947,
- "name": "El Metropolitano",
- "screenName": "ElTWdelMetro",
- "description": "Noticias, an\ufffd\ufffdlisis e informaci\ufffd\ufffdn para el crecimiento de la regi\ufffd\ufffdn.",
+ "description": "Noticias",
"isDefaultProfileImage": false,
- "url": "http://elmetropolitano.com.ar/",
"isProtected": false
}
}
\ No newline at end of file