You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@phoenix.apache.org by "datta.mane2@gmail.com" <da...@gmail.com> on 2017/10/19 13:45:18 UTC
Phoenix tables not updated using phoenix sync in apache flume
Hello :
I am trying to transfer data from my local sql database table to phoenix on EC2 instance
Do not see any error in logs and data is not inserted / updated into phoenix tables
Any help would be really appreciated.
$bin/flume-ng agent --conf conf --conf-file conf/stsflume-mysql-phoenix.conf --name agent1 -Dflume.root.logger=DEBUG,console
Below is the configuration
agent1.channels.ph1.type = memory
agent1.sources.sql-source.channels = ph1
agent1.channels = ph1
agent1.sinks = PHOENIX
agent1.sources = sql-source
agent1.sources.sql-source.type = org.keedio.flume.source.SQLSource
# URL to connect to database (currently only mysql is supported)
agent1.sources.sql-source.type = org.keedio.flume.source.SQLSource
agent1.sources.sql-source.hibernate.connection.url = jdbc:mysql://127.0.0.1:3306/my_schema
# Hibernate Database connection properties
agent1.sources.sql-source.hibernate.connection.user = root
agent1.sources.sql-source.hibernate.connection.password = root
agent1.sources.sql-source.hibernate.connection.autocommit = true
agent1.sources.sql-source.hibernate.dialect = org.hibernate.dialect.MySQLDialect
agent1.sources.sql-source.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.sql-source.table = dwki_spelling_stage_summary
# Columns to import to kafka (default * import entire row)
agent1.sources.sql-source.columns.to.select = "profile_id","class_id","school_id","district_id","time_period_code","spelling_stage"
# Increment column properties
# Increment value is from you want to start taking data from tables (0 will import entire table)
agent1.sources.sql-source.incremental.value = 0
# Query delay, each configured milisecond the query will be sent
agent1.sources.sql-source.run.query.delay=100
# Status file is used to save last readed row
agent1.sources.sql-source.status.file.path = /var/lib/flume
agent1.sources.sql-source.status.file.name = sql-source.status
agent1.sinks.PHOENIX.channel = ph1
agent1.sinks.PHOENIX.type = org.apache.phoenix.flume.sink.PhoenixSink
agent1.sinks.PHOENIX.batchSize = 100
agent1.sinks.PHOENIX.zookeeperQuorum=ip-11-46-175-227.ec2.internal
#agent1.sinks.PHOENIX.jdbcUrl=jdbc:phoenix:thin:url=http://ip-11-46-174-220.ec2.internal:8765;serialization=PROTOBUF;authentication=SPNEGO
agent1.sinks.PHOENIX.jdbcUrl=jdbc:phoenix:thin:url=http://ip-11-46-174-220.ec2.internal:8765;serialization=PROTOBUF
agent1.sinks.PHOENIX.table="dwki_spelling_stage_summary1"
agent1.sinks.PHOENIX.serializer=regex
agent1.sinks.PHOENIX.serializer.regex=(.*)
agent1.sinks.PHOENIX.serializer.columns="profile_id","class_id","school_id","district_id","time_period_code","spelling_stage","creation_date","modified_date"
I know it reading from the source because i can see sql-source.status updated every a record is inserted
Is there anything I am missing
In logs I keep seeing this
2017-10-19 00:43:49,564 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.phoenix.flume.serializer.RegexEventSerializer.upsertEvents(RegexEventSerializer.java:90)] payload 1 size doesn't match the pattern 6
2017-10-19 00:43:49,564 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.phoenix.flume.serializer.RegexEventSerializer.upsertEvents(RegexEventSerializer.java:90)] payload 1 size doesn't match the pattern 6
Below are table details
0: jdbc:phoenix:thin:url=http://localhost:876> !describe "dwki_spelling_stage_summary1";
+------------+--------------+-------------------------------+-------------------+------------+------------+--------------+----------------+-----------------+-----------------+-----------+--+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | COLUMN_NAME | DATA_TYPE | TYPE_NAME | COLUMN_SIZE | BUFFER_LENGTH | DECIMAL_DIGITS | NUM_PREC_RADIX | NULLABLE | |
+------------+--------------+-------------------------------+-------------------+------------+------------+--------------+----------------+-----------------+-----------------+-----------+--+
| | | dwki_spelling_stage_summary1 | profile_id | 12 | VARCHAR | null | null | null | null | 0 | |
| | | dwki_spelling_stage_summary1 | class_id | 12 | VARCHAR | null | null | null | null | 1 | |
| | | dwki_spelling_stage_summary1 | school_id | 12 | VARCHAR | null | null | null | null | 1 | |
| | | dwki_spelling_stage_summary1 | district_id | 12 | VARCHAR | null | null | null | null | 1 | |
| | | dwki_spelling_stage_summary1 | time_period_code | 12 | VARCHAR | null | null | null | null | 1 | |
| | | dwki_spelling_stage_summary1 | spelling_stage | 12 | VARCHAR | null | null | null | null | 1 | |
| | | dwki_spelling_stage_summary1 | creation_date | 12 | VARCHAR | null | null | null | null | 1 | |
| | | dwki_spelling_stage_summary1 | modified_date | 12 | VARCHAR | null | null | null | null | 1 | |
+------------+--------------+-------------------------------+-------------------+------------+------------+--------------+----------------+-----------------+-----------------+----
Manually adding records works fine
0: jdbc:phoenix:thin:url=http://localhost:876> UPSERT INTO "dwki_spelling_stage_summary1" ("profile_id", "class_id", "school_id", "district_id", "time_period_code", "spelling_stage") VALUES ('111', '111c', '111s', '111d', 'b', 'A');
1 row affected (0.033 seconds)
0: jdbc:phoenix:thin:url=http://localhost:876> select * from "dwki_spelling_stage_summary1";
+-------------+-----------+------------+--------------+-------------------+-----------------+----------------+----------------+
| profile_id | class_id | school_id | district_id | time_period_code | spelling_stage | creation_date | modified_date |
+-------------+-----------+------------+--------------+-------------------+-----------------+----------------+----------------+
| 111 | 111c | 111s | 111d | b | A | | |
| 12 | | | | | | | |
+-------------+-----------+------------+--------------+-------------------+-----------------+----------------+----------------+