You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@phoenix.apache.org by "datta.mane2@gmail.com" <da...@gmail.com> on 2017/10/19 13:45:18 UTC

Phoenix tables not updated using phoenix sync in apache flume

Hello :
I am trying to transfer data from my local sql database table to phoenix on EC2 instance
Do not see any error in logs and data is not inserted / updated into phoenix tables
Any help would be really appreciated.
 $bin/flume-ng agent --conf conf --conf-file conf/stsflume-mysql-phoenix.conf --name agent1 -Dflume.root.logger=DEBUG,console

Below is the configuration 

agent1.channels.ph1.type = memory
agent1.sources.sql-source.channels = ph1
agent1.channels = ph1
agent1.sinks = PHOENIX
agent1.sources = sql-source

agent1.sources.sql-source.type = org.keedio.flume.source.SQLSource
# URL to connect to database (currently only mysql is supported)
agent1.sources.sql-source.type = org.keedio.flume.source.SQLSource
agent1.sources.sql-source.hibernate.connection.url = jdbc:mysql://127.0.0.1:3306/my_schema

# Hibernate Database connection properties
agent1.sources.sql-source.hibernate.connection.user = root
agent1.sources.sql-source.hibernate.connection.password = root
agent1.sources.sql-source.hibernate.connection.autocommit = true
agent1.sources.sql-source.hibernate.dialect = org.hibernate.dialect.MySQLDialect
agent1.sources.sql-source.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.sql-source.table = dwki_spelling_stage_summary

# Columns to import to kafka (default * import entire row)
agent1.sources.sql-source.columns.to.select = "profile_id","class_id","school_id","district_id","time_period_code","spelling_stage"


# Increment column properties

# Increment value is from you want to start taking data from tables (0 will import entire table)
agent1.sources.sql-source.incremental.value = 0

# Query delay, each configured milisecond the query will be sent
agent1.sources.sql-source.run.query.delay=100


# Status file is used to save last readed row
agent1.sources.sql-source.status.file.path = /var/lib/flume
agent1.sources.sql-source.status.file.name = sql-source.status

agent1.sinks.PHOENIX.channel = ph1
agent1.sinks.PHOENIX.type = org.apache.phoenix.flume.sink.PhoenixSink
agent1.sinks.PHOENIX.batchSize = 100
agent1.sinks.PHOENIX.zookeeperQuorum=ip-11-46-175-227.ec2.internal

#agent1.sinks.PHOENIX.jdbcUrl=jdbc:phoenix:thin:url=http://ip-11-46-174-220.ec2.internal:8765;serialization=PROTOBUF;authentication=SPNEGO
agent1.sinks.PHOENIX.jdbcUrl=jdbc:phoenix:thin:url=http://ip-11-46-174-220.ec2.internal:8765;serialization=PROTOBUF
agent1.sinks.PHOENIX.table="dwki_spelling_stage_summary1"
agent1.sinks.PHOENIX.serializer=regex
agent1.sinks.PHOENIX.serializer.regex=(.*)
agent1.sinks.PHOENIX.serializer.columns="profile_id","class_id","school_id","district_id","time_period_code","spelling_stage","creation_date","modified_date"

I know it reading from the source because i can see sql-source.status updated every a record is inserted

Is there anything I am missing 
In logs I keep seeing this  

2017-10-19 00:43:49,564 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.phoenix.flume.serializer.RegexEventSerializer.upsertEvents(RegexEventSerializer.java:90)] payload 1 size doesn't match the pattern 6 

2017-10-19 00:43:49,564 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.phoenix.flume.serializer.RegexEventSerializer.upsertEvents(RegexEventSerializer.java:90)] payload 1 size doesn't match the pattern 6 

 

Below are table details 
0: jdbc:phoenix:thin:url=http://localhost:876> !describe "dwki_spelling_stage_summary1";

+------------+--------------+-------------------------------+-------------------+------------+------------+--------------+----------------+-----------------+-----------------+-----------+--+

| TABLE_CAT  | TABLE_SCHEM  |          TABLE_NAME           |    COLUMN_NAME    | DATA_TYPE  | TYPE_NAME  | COLUMN_SIZE  | BUFFER_LENGTH  | DECIMAL_DIGITS  | NUM_PREC_RADIX  | NULLABLE  |  |

+------------+--------------+-------------------------------+-------------------+------------+------------+--------------+----------------+-----------------+-----------------+-----------+--+

|            |              | dwki_spelling_stage_summary1  | profile_id        | 12         | VARCHAR    | null         | null           | null            | null            | 0         |  |

|            |              | dwki_spelling_stage_summary1  | class_id          | 12         | VARCHAR    | null         | null           | null            | null            | 1         |  |

|            |              | dwki_spelling_stage_summary1  | school_id         | 12         | VARCHAR    | null         | null           | null            | null            | 1         |  |

|            |              | dwki_spelling_stage_summary1  | district_id       | 12         | VARCHAR    | null         | null           | null            | null            | 1         |  |

|            |              | dwki_spelling_stage_summary1  | time_period_code  | 12         | VARCHAR    | null         | null           | null            | null            | 1         |  |

|            |              | dwki_spelling_stage_summary1  | spelling_stage    | 12         | VARCHAR    | null         | null           | null            | null            | 1         |  |

|            |              | dwki_spelling_stage_summary1  | creation_date     | 12         | VARCHAR    | null         | null           | null            | null            | 1         |  |

|            |              | dwki_spelling_stage_summary1  | modified_date     | 12         | VARCHAR    | null         | null           | null            | null            | 1         |  |

+------------+--------------+-------------------------------+-------------------+------------+------------+--------------+----------------+-----------------+-----------------+----

 Manually adding records works fine 

0: jdbc:phoenix:thin:url=http://localhost:876> UPSERT  INTO "dwki_spelling_stage_summary1" ("profile_id", "class_id", "school_id", "district_id", "time_period_code", "spelling_stage") VALUES ('111', '111c', '111s', '111d', 'b', 'A');

1 row affected (0.033 seconds)

0: jdbc:phoenix:thin:url=http://localhost:876> select * from "dwki_spelling_stage_summary1";

+-------------+-----------+------------+--------------+-------------------+-----------------+----------------+----------------+

| profile_id  | class_id  | school_id  | district_id  | time_period_code  | spelling_stage  | creation_date  | modified_date  |

+-------------+-----------+------------+--------------+-------------------+-----------------+----------------+----------------+

| 111         | 111c      | 111s       | 111d         | b                 | A               |                |                |

| 12          |           |            |              |                   |                 |                |                |

+-------------+-----------+------------+--------------+-------------------+-----------------+----------------+----------------+