You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/09 21:35:42 UTC
[1/6] incubator-streams-examples git commit: flink example
Repository: incubator-streams-examples
Updated Branches:
refs/heads/master 6e93a8f7a -> 8fe6860f7
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/resources/1000twitterids.txt
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/1000twitterids.txt b/flink/flink-twitter-collection/src/test/resources/1000twitterids.txt
new file mode 100644
index 0000000..0590b9d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/1000twitterids.txt
@@ -0,0 +1,1000 @@
+twitter:3424266646
+twitter:3277467241
+twitter:3244517214
+twitter:29953647
+twitter:63818319
+twitter:1528436754
+twitter:405580894
+twitter:322778026
+twitter:172382176
+twitter:633076833
+twitter:703735608
+twitter:2347223440
+twitter:2907929487
+twitter:950240089
+twitter:1418546592
+twitter:3318418717
+twitter:2848958704
+twitter:1120797264
+twitter:933623324
+twitter:2977700375
+twitter:328204518
+twitter:585131136
+twitter:2868789793
+twitter:158347647
+twitter:2915413161
+twitter:2217367263
+twitter:2534019247
+twitter:3033565239
+twitter:377379801
+twitter:2525341814
+twitter:3123827524
+twitter:1840932523
+twitter:3307643975
+twitter:3301777832
+twitter:961987748
+twitter:3205632255
+twitter:2799469322
+twitter:17730681
+twitter:1495242662
+twitter:1909516123
+twitter:263933760
+twitter:312651511
+twitter:2479527469
+twitter:2357151036
+twitter:346433828
+twitter:44801893
+twitter:1049697306
+twitter:2779673194
+twitter:18323141
+twitter:2172488902
+twitter:2373431930
+twitter:1038322550
+twitter:2946211549
+twitter:2911057543
+twitter:1186036284
+twitter:2878076317
+twitter:1312950464
+twitter:57323685
+twitter:32929857
+twitter:301933631
+twitter:2852217152
+twitter:330422649
+twitter:98470876
+twitter:933125156
+twitter:3237125761
+twitter:914882005
+twitter:1560239652
+twitter:900444860
+twitter:402918702
+twitter:1820690166
+twitter:3074359086
+twitter:353183684
+twitter:528544881
+twitter:1881638161
+twitter:2751762993
+twitter:3161315692
+twitter:3305680079
+twitter:1721613488
+twitter:513068659
+twitter:627186234
+twitter:3203648416
+twitter:1541163325
+twitter:1882043502
+twitter:29071727
+twitter:610104090
+twitter:2819781014
+twitter:2909115204
+twitter:213886397
+twitter:3249385591
+twitter:3086875073
+twitter:87040031
+twitter:2202487475
+twitter:334896132
+twitter:49163181
+twitter:3433984816
+twitter:543969362
+twitter:489445461
+twitter:855051894
+twitter:2792040175
+twitter:117051455
+twitter:438599410
+twitter:1387329846
+twitter:711595782
+twitter:3230662766
+twitter:2766672269
+twitter:2926781875
+twitter:863203928
+twitter:517199566
+twitter:201645935
+twitter:1555939147
+twitter:2943152669
+twitter:1324775431
+twitter:400234897
+twitter:2347416842
+twitter:1558112510
+twitter:474415350
+twitter:2153710970
+twitter:1408335014
+twitter:3633713483
+twitter:3166021013
+twitter:3530993294
+twitter:332598229
+twitter:308252069
+twitter:3317826986
+twitter:572175644
+twitter:1718271572
+twitter:2869090090
+twitter:23725109
+twitter:1926137280
+twitter:1486830500
+twitter:743080386
+twitter:3250479720
+twitter:2560441544
+twitter:2715649872
+twitter:287089153
+twitter:18761334
+twitter:2305577745
+twitter:724860668
+twitter:193306049
+twitter:2615761979
+twitter:2463299598
+twitter:1436916012
+twitter:919019185
+twitter:90502449
+twitter:50689522
+twitter:1383774679
+twitter:612784850
+twitter:410319975
+twitter:833440153
+twitter:442322844
+twitter:2181167094
+twitter:94012832
+twitter:112748352
+twitter:1474618075
+twitter:158262669
+twitter:2391506308
+twitter:882502026
+twitter:2693660146
+twitter:2971933908
+twitter:55271184
+twitter:2287356556
+twitter:2895756090
+twitter:407147132
+twitter:3262181
+twitter:313317193
+twitter:2729137002
+twitter:2939122360
+twitter:2751601568
+twitter:1215082350
+twitter:124866576
+twitter:274292311
+twitter:3310301042
+twitter:95407473
+twitter:24993769
+twitter:1342908648
+twitter:1805339413
+twitter:3118252036
+twitter:893269387
+twitter:1481149014
+twitter:463288019
+twitter:75008083
+twitter:2895489727
+twitter:965493739
+twitter:278637248
+twitter:1937513246
+twitter:422218268
+twitter:3320995462
+twitter:78682286
+twitter:2777069098
+twitter:2909553730
+twitter:2914338670
+twitter:1251667531
+twitter:2764034755
+twitter:532659717
+twitter:269002510
+twitter:29373713
+twitter:358075450
+twitter:633880614
+twitter:200374379
+twitter:141628294
+twitter:1513028977
+twitter:116798791
+twitter:2937455354
+twitter:246194623
+twitter:793925970
+twitter:115594167
+twitter:82463176
+twitter:324774974
+twitter:185844856
+twitter:2462295999
+twitter:3555105016
+twitter:1029169117
+twitter:2689309484
+twitter:1587145976
+twitter:1607241271
+twitter:3032276402
+twitter:183916933
+twitter:63766245
+twitter:151217255
+twitter:2781098109
+twitter:252081559
+twitter:1608788256
+twitter:41984573
+twitter:1896587353
+twitter:40136999
+twitter:295505814
+twitter:384867933
+twitter:116947371
+twitter:255703939
+twitter:2687800732
+twitter:76543916
+twitter:881649782
+twitter:2765729924
+twitter:1715695669
+twitter:1965383022
+twitter:2888214228
+twitter:21820514
+twitter:1727966414
+twitter:2581992818
+twitter:103999565
+twitter:741018846
+twitter:446792386
+twitter:2568989424
+twitter:2780674777
+twitter:465934916
+twitter:3378294885
+twitter:2885604327
+twitter:3336273419
+twitter:130742941
+twitter:2327629099
+twitter:1103818104
+twitter:3050036073
+twitter:2882456842
+twitter:2702914248
+twitter:2153674818
+twitter:132825659
+twitter:289758699
+twitter:2995946100
+twitter:3027449217
+twitter:2708029160
+twitter:1529367002
+twitter:608170333
+twitter:140446819
+twitter:2790688993
+twitter:1597308192
+twitter:14462028
+twitter:104062608
+twitter:370274893
+twitter:356145607
+twitter:566542629
+twitter:112587243
+twitter:39372070
+twitter:146853060
+twitter:2440984657
+twitter:3074554539
+twitter:204701034
+twitter:887623447
+twitter:1971521630
+twitter:2457208175
+twitter:466113358
+twitter:1574643830
+twitter:1465533884
+twitter:2500404589
+twitter:1633154150
+twitter:1349117870
+twitter:1658071267
+twitter:593022891
+twitter:3094177813
+twitter:1304672510
+twitter:3385525697
+twitter:2916225552
+twitter:2759773715
+twitter:1369215552
+twitter:1058390078
+twitter:2532850321
+twitter:351483656
+twitter:1902796704
+twitter:113000738
+twitter:2241245557
+twitter:2416606754
+twitter:408729540
+twitter:2530294556
+twitter:2936808249
+twitter:3138999692
+twitter:2679987883
+twitter:1448537377
+twitter:2524773906
+twitter:942079406
+twitter:2217584389
+twitter:3059427504
+twitter:3028507725
+twitter:632766658
+twitter:3302663431
+twitter:2914832897
+twitter:93487101
+twitter:2786054379
+twitter:1339647769
+twitter:531402307
+twitter:402066474
+twitter:337936675
+twitter:2760568625
+twitter:1385916396
+twitter:2595560922
+twitter:421910477
+twitter:1713100813
+twitter:352016040
+twitter:415247994
+twitter:1883606209
+twitter:2974994111
+twitter:1118022211
+twitter:3096979637
+twitter:711889867
+twitter:262890561
+twitter:233810062
+twitter:1877177168
+twitter:964106670
+twitter:164985413
+twitter:2920420361
+twitter:318936782
+twitter:3289826764
+twitter:145873735
+twitter:2523059919
+twitter:2409896179
+twitter:2292047201
+twitter:285674825
+twitter:2765549780
+twitter:2359541905
+twitter:2419103894
+twitter:358884588
+twitter:206231205
+twitter:136500778
+twitter:1397885138
+twitter:2625422097
+twitter:2524578002
+twitter:604278657
+twitter:2625634867
+twitter:73168019
+twitter:407448958
+twitter:189276174
+twitter:2507896925
+twitter:80880449
+twitter:520177827
+twitter:418469102
+twitter:2925075456
+twitter:615730636
+twitter:2995998941
+twitter:2697270934
+twitter:497135011
+twitter:2944598402
+twitter:428706893
+twitter:1345291712
+twitter:388751708
+twitter:130092079
+twitter:2984741882
+twitter:1047514436
+twitter:15927135
+twitter:2884357840
+twitter:294362779
+twitter:2870985800
+twitter:1720400449
+twitter:130027314
+twitter:2970518577
+twitter:240923858
+twitter:1613498838
+twitter:708321211
+twitter:1403382426
+twitter:2602186970
+twitter:1596855998
+twitter:280062526
+twitter:2716454552
+twitter:268720451
+twitter:2869044811
+twitter:1911762488
+twitter:392373280
+twitter:2151082712
+twitter:2770919004
+twitter:231541900
+twitter:60122778
+twitter:390006102
+twitter:240167506
+twitter:1558314660
+twitter:221608257
+twitter:852829933
+twitter:461669243
+twitter:239778483
+twitter:502146157
+twitter:1471963970
+twitter:276426707
+twitter:2336546150
+twitter:323595235
+twitter:128670043
+twitter:1308641714
+twitter:1411112756
+twitter:3011727217
+twitter:3082006921
+twitter:450537474
+twitter:2673101407
+twitter:2416030447
+twitter:51952627
+twitter:708057486
+twitter:833620748
+twitter:3024957797
+twitter:2147572362
+twitter:1712467098
+twitter:2899300501
+twitter:1348351772
+twitter:2923114629
+twitter:2779232814
+twitter:21306308
+twitter:1466314507
+twitter:1224588289
+twitter:81307783
+twitter:42717316
+twitter:315972617
+twitter:434649827
+twitter:105839296
+twitter:366063496
+twitter:34045892
+twitter:3076447389
+twitter:92437198
+twitter:3124335006
+twitter:1444393410
+twitter:351737762
+twitter:1919360383
+twitter:2836048345
+twitter:1670939112
+twitter:722140159
+twitter:92939425
+twitter:2932728756
+twitter:2831872033
+twitter:1354255123
+twitter:1689738186
+twitter:463578260
+twitter:2881582438
+twitter:912252510
+twitter:3226221887
+twitter:390827200
+twitter:269169237
+twitter:1450007192
+twitter:2735984326
+twitter:3029836305
+twitter:28291382
+twitter:785668627
+twitter:567287970
+twitter:1480004420
+twitter:131927864
+twitter:2958631308
+twitter:488490020
+twitter:2603422688
+twitter:3186614985
+twitter:177373618
+twitter:2466506329
+twitter:2651294251
+twitter:3367170684
+twitter:2673870882
+twitter:369098635
+twitter:242011326
+twitter:18099277
+twitter:1922210574
+twitter:3093762445
+twitter:470634878
+twitter:1674607392
+twitter:2920526283
+twitter:3261677580
+twitter:2192187078
+twitter:485599960
+twitter:1854850729
+twitter:95198467
+twitter:2228217740
+twitter:2171528344
+twitter:2957461230
+twitter:226615737
+twitter:1624183567
+twitter:158597677
+twitter:2909224690
+twitter:19278114
+twitter:2488284258
+twitter:2777071149
+twitter:1598064697
+twitter:2740691127
+twitter:3100908480
+twitter:1147010126
+twitter:2741161553
+twitter:439971668
+twitter:3247227273
+twitter:2884261062
+twitter:3127250575
+twitter:2942021278
+twitter:539428196
+twitter:409599986
+twitter:3161801331
+twitter:2328613860
+twitter:1903013437
+twitter:313082004
+twitter:2580495721
+twitter:209464435
+twitter:600172085
+twitter:339541217
+twitter:62219810
+twitter:583287316
+twitter:295891933
+twitter:561683767
+twitter:229192352
+twitter:1357869918
+twitter:235438136
+twitter:1599249169
+twitter:583879210
+twitter:507744802
+twitter:1696336261
+twitter:2323537206
+twitter:36882220
+twitter:541528426
+twitter:956202559
+twitter:387936537
+twitter:211658842
+twitter:2685186010
+twitter:2581656488
+twitter:391154378
+twitter:122932105
+twitter:409764153
+twitter:129737967
+twitter:2848806360
+twitter:3054860719
+twitter:372199585
+twitter:2316121597
+twitter:703345746
+twitter:3335505287
+twitter:2466151422
+twitter:380038166
+twitter:420561214
+twitter:2977085351
+twitter:110955327
+twitter:3004295886
+twitter:2362857361
+twitter:3053844460
+twitter:3182081552
+twitter:324208260
+twitter:2571790321
+twitter:1061498868
+twitter:2187395299
+twitter:2187482779
+twitter:3096652530
+twitter:2538239672
+twitter:3809634552
+twitter:2306848839
+twitter:1544061547
+twitter:151075965
+twitter:3250238556
+twitter:16157689
+twitter:1692663644
+twitter:1356000732
+twitter:436774994
+twitter:45503055
+twitter:1086037316
+twitter:2798297775
+twitter:2923485772
+twitter:58731726
+twitter:211816170
+twitter:885013716
+twitter:2608529078
+twitter:2954917057
+twitter:2271021600
+twitter:173743066
+twitter:451543575
+twitter:3219728436
+twitter:399824828
+twitter:2464688153
+twitter:2541069631
+twitter:1522892262
+twitter:3167829845
+twitter:944851321
+twitter:2471474509
+twitter:68073858
+twitter:1496221376
+twitter:13979882
+twitter:2218792189
+twitter:302123873
+twitter:2845915546
+twitter:431402814
+twitter:1364254945
+twitter:2711277666
+twitter:2766696876
+twitter:2495441323
+twitter:2844317433
+twitter:138009079
+twitter:2578631100
+twitter:478167529
+twitter:1222728360
+twitter:1323688411
+twitter:2883066187
+twitter:2443554697
+twitter:411631689
+twitter:68537682
+twitter:1027019269
+twitter:1660752493
+twitter:987324488
+twitter:2764106926
+twitter:2184511674
+twitter:103419315
+twitter:2310456424
+twitter:1572938088
+twitter:2554895281
+twitter:34138105
+twitter:2942100621
+twitter:160517898
+twitter:285075974
+twitter:2260805169
+twitter:19390498
+twitter:301696842
+twitter:2588239985
+twitter:2886588596
+twitter:2962622367
+twitter:1867897483
+twitter:2827053488
+twitter:1447767319
+twitter:2924491293
+twitter:167327096
+twitter:3309592402
+twitter:2795575638
+twitter:578758971
+twitter:2888665561
+twitter:30542348
+twitter:1437049609
+twitter:2242541566
+twitter:74354017
+twitter:58900854
+twitter:2159055031
+twitter:246517688
+twitter:2916873012
+twitter:1110055280
+twitter:562430843
+twitter:761797794
+twitter:1648208552
+twitter:301483343
+twitter:2896842048
+twitter:522103295
+twitter:1578517986
+twitter:2659610776
+twitter:2890560429
+twitter:1427665578
+twitter:268363160
+twitter:563709041
+twitter:2172300002
+twitter:2791262431
+twitter:3039809351
+twitter:2914940301
+twitter:2746560353
+twitter:2892191616
+twitter:71596845
+twitter:233770184
+twitter:1530949130
+twitter:105906110
+twitter:755347622
+twitter:490836906
+twitter:357603454
+twitter:324517203
+twitter:2835402315
+twitter:3285479894
+twitter:86368327
+twitter:238219970
+twitter:3153173945
+twitter:2732361234
+twitter:2357626327
+twitter:346602505
+twitter:13732632
+twitter:44055265
+twitter:2998032219
+twitter:482072312
+twitter:1721073866
+twitter:1386781034
+twitter:168194206
+twitter:1213443144
+twitter:181296114
+twitter:942598400
+twitter:2955577216
+twitter:582056669
+twitter:747540468
+twitter:2371722140
+twitter:360824004
+twitter:3023711736
+twitter:207032580
+twitter:2748107976
+twitter:464428175
+twitter:3150849096
+twitter:85450014
+twitter:2840066340
+twitter:2287819200
+twitter:240931426
+twitter:553606800
+twitter:397876544
+twitter:2195298230
+twitter:2601812005
+twitter:3013344739
+twitter:17599363
+twitter:1572639314
+twitter:3377673407
+twitter:303420278
+twitter:2811879995
+twitter:526860891
+twitter:346333874
+twitter:113568311
+twitter:705488304
+twitter:3238867619
+twitter:333772149
+twitter:373309716
+twitter:300472003
+twitter:3223424681
+twitter:2895699896
+twitter:3241119570
+twitter:1147453440
+twitter:3135402609
+twitter:521763744
+twitter:2702966971
+twitter:2878317616
+twitter:845031697
+twitter:2855454471
+twitter:3051902539
+twitter:482306439
+twitter:129173738
+twitter:306572138
+twitter:2941951538
+twitter:762707233
+twitter:2732608168
+twitter:1228456939
+twitter:246020724
+twitter:1920607602
+twitter:14434245
+twitter:1254943537
+twitter:1520746602
+twitter:150745124
+twitter:1350160351
+twitter:38707222
+twitter:267766858
+twitter:2992121760
+twitter:712666764
+twitter:983036864
+twitter:289490939
+twitter:269797384
+twitter:100215048
+twitter:3099557245
+twitter:2339741570
+twitter:306005146
+twitter:1182227460
+twitter:288235870
+twitter:1412832260
+twitter:455190443
+twitter:489912183
+twitter:448994061
+twitter:2944595072
+twitter:2453094914
+twitter:2899434206
+twitter:59288818
+twitter:2824706688
+twitter:423363992
+twitter:972850482
+twitter:997868714
+twitter:1203750733
+twitter:176147179
+twitter:115110596
+twitter:2978397615
+twitter:2528946267
+twitter:620180433
+twitter:365949935
+twitter:110609853
+twitter:1533494268
+twitter:2723839166
+twitter:34186887
+twitter:2864430424
+twitter:76942977
+twitter:361086733
+twitter:2724200587
+twitter:635206139
+twitter:2757801421
+twitter:19651443
+twitter:3364322949
+twitter:2770576744
+twitter:2168612560
+twitter:764020297
+twitter:2558268513
+twitter:2855384901
+twitter:1881414907
+twitter:2502212139
+twitter:3250037586
+twitter:2525185944
+twitter:591375982
+twitter:707911211
+twitter:3025041666
+twitter:19785599
+twitter:2311172950
+twitter:922817815
+twitter:739363530
+twitter:2812894393
+twitter:2496283986
+twitter:206162815
+twitter:590916342
+twitter:354053245
+twitter:2735195854
+twitter:2788759128
+twitter:3510947235
+twitter:3490740532
+twitter:2920847304
+twitter:2681444558
+twitter:2856805755
+twitter:3103899682
+twitter:145893832
+twitter:3065663910
+twitter:2736009516
+twitter:2835226230
+twitter:1590913771
+twitter:2700889555
+twitter:2221272164
+twitter:109780161
+twitter:700221218
+twitter:541753453
+twitter:126575915
+twitter:274336817
+twitter:2498172455
+twitter:2809515630
+twitter:2588774684
+twitter:296734891
+twitter:2212410182
+twitter:243027454
+twitter:1336526904
+twitter:397062736
+twitter:449331876
+twitter:30619307
+twitter:2310483811
+twitter:2437586509
+twitter:191710730
+twitter:1084185378
+twitter:2831486681
+twitter:1606477879
+twitter:969600636
+twitter:529783214
+twitter:2928131586
+twitter:190041293
+twitter:2967031274
+twitter:2165962781
+twitter:376501355
+twitter:284137985
+twitter:266863824
+twitter:407944074
+twitter:108456036
+twitter:1641294422
+twitter:900733706
+twitter:1063071450
+twitter:1682722328
+twitter:341419520
+twitter:1644293778
+twitter:2245151467
+twitter:511176989
+twitter:241922669
+twitter:3388315624
+twitter:1909431145
+twitter:2223820028
+twitter:600581315
+twitter:1723555076
+twitter:2748445313
+twitter:561211823
+twitter:561022931
+twitter:2751429993
+twitter:2714908343
+twitter:16165257
+twitter:524623359
+twitter:306741266
+twitter:469994381
+twitter:2561892084
+twitter:998802661
+twitter:1492924374
+twitter:789039140
+twitter:210150093
+twitter:817544820
+twitter:35740178
+twitter:326162841
+twitter:1447331628
+twitter:17493441
+twitter:2874693608
+twitter:965027312
+twitter:261936985
+twitter:510564259
+twitter:728031187
+twitter:164696234
+twitter:2204519310
+twitter:1626241164
+twitter:1024940588
+twitter:221486613
+twitter:571084565
+twitter:3029264508
+twitter:221716563
+twitter:2211417135
+twitter:499972359
+twitter:1565989165
+twitter:2436927208
+twitter:381029291
+twitter:2730580620
+twitter:3436438413
+twitter:2466014604
+twitter:538990742
+twitter:2935470687
+twitter:1162845468
+twitter:468108082
+twitter:2383897542
+twitter:2542119658
+twitter:1962281514
+twitter:171235080
+twitter:536915535100125185
+twitter:2841076618
+twitter:3006098500
+twitter:1057158554
+twitter:3245676721
+twitter:251087536
+twitter:3082811549
+twitter:281785349
+twitter:1674871100
+twitter:1898659951
+twitter:1414854156
+twitter:428693618
+twitter:2385953101
+twitter:2281213477
+twitter:2786368894
+twitter:2253203998
+twitter:357277727
+twitter:1358707970
+twitter:545186198
+twitter:3033613587
+twitter:107121821
+twitter:595965259
+twitter:583894637
+twitter:1306698787
+twitter:442262869
+twitter:2868353318
+twitter:1908436844
+twitter:271982042
+twitter:495202171
+twitter:251586884
+twitter:3151032974
+twitter:2213682568
+twitter:1203133039
+twitter:193128957
+twitter:597407120
+twitter:2781102086
+twitter:369254505
+twitter:62831036
+twitter:2328734640
+twitter:2579064082
+twitter:3271313827
+twitter:2880366619
+twitter:2323026113
+twitter:446380518
+twitter:245418139
+twitter:261211664
+twitter:1893329208
+twitter:3406596309
+twitter:584967077
+twitter:1708862304
+twitter:388961426
+twitter:2421535351
+twitter:2194375668
+twitter:2790313673
+twitter:2728894977
+twitter:2829174824
+twitter:784541196
+twitter:959902393
+twitter:249705367
+twitter:1677679309
+twitter:2825975175
+twitter:1305768366
+twitter:373475046
+twitter:785362464
+twitter:419607671
+twitter:61031675
+twitter:3854236343
+twitter:714603248
+twitter:1301447720
+twitter:827660912
+twitter:2383764684
+twitter:3180084906
+twitter:3265558124
+twitter:608536922
+twitter:238943561
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
new file mode 100644
index 0000000..e74f00c
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
@@ -0,0 +1,10 @@
+twitter {
+ endpoint = followers
+ version = 1.1
+ oauth {
+ consumerKey = ""
+ consumerSecret = ""
+ accessToken = ""
+ accessTokenSecret = ""
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
new file mode 100644
index 0000000..63a6481
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
@@ -0,0 +1,10 @@
+twitter {
+ version = 1.1
+ endpoint = statuses
+ oauth {
+ consumerKey = ""
+ consumerSecret = ""
+ accessToken = ""
+ accessTokenSecret = ""
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
new file mode 100644
index 0000000..6e0a879
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
@@ -0,0 +1,10 @@
+twitter {
+ version = 1.1
+ endpoint = users
+ oauth {
+ consumerKey = ""
+ consumerSecret = ""
+ accessToken = ""
+ accessTokenSecret = ""
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/resources/asf.txt
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/asf.txt b/flink/flink-twitter-collection/src/test/resources/asf.txt
new file mode 100644
index 0000000..c2b1ea1
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/asf.txt
@@ -0,0 +1 @@
+twitter:18055613
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
new file mode 100644
index 0000000..aa2b1a9
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
@@ -0,0 +1,81 @@
+package com.peoplepattern.streams.twitter.collection
+
+import java.nio.file.{Files, Paths}
+
+import com.peoplepattern.streams.pipelines.pdb.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+
+/**
+ * Created by sblackmon on 3/13/16.
+ */
+class FlinkTwitterFollowingPipelineIT extends FlatSpec {
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT])
+
+ @Test
+ def flinkTwitterFollowersPipelineFriendsIT = {
+
+ val testConfig : TwitterFollowingPipelineConfiguration =
+ new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+ testConfig.getTwitter.setEndpoint("friends")
+ val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
+ source.setPath("target/test-classes")
+ testConfig.setSource(source);
+ val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
+ destination.setPath("target/test-classes")
+ testConfig.setDestination(destination)
+ testConfig.setProviderWaitMs(1000l)
+ testConfig.setTest(true)
+
+ val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+ val jobThread = new Thread(job)
+ jobThread.start
+ jobThread.join
+
+ eventually (timeout(30 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends")))
+ assert(
+ Source.fromFile("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends", "UTF-8").getLines.size
+ > 90)
+ }
+
+ }
+
+ @Test
+ def flinkTwitterFollowersPipelineFollowersIT = {
+
+ val testConfig : TwitterFollowingPipelineConfiguration =
+ new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+ testConfig.getTwitter.setEndpoint("followers")
+ val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
+ source.setPath("target/test-classes")
+ testConfig.setSource(source);
+ val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/followers").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
+ destination.setPath("target/test-classes")
+ testConfig.setDestination(destination)
+ testConfig.setProviderWaitMs(1000l)
+ testConfig.setTest(true)
+
+ val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+ val jobThread = new Thread(job)
+ jobThread.start
+ jobThread.join
+
+ eventually (timeout(30 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterFollowingPipeline/followers")))
+ assert(
+ Source.fromFile("target/test-classes/FlinkTwitterFollowingPipeline/followers", "UTF-8").getLines.size
+ > 500)
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
new file mode 100644
index 0000000..8a942e5
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -0,0 +1,55 @@
+package com.peoplepattern.streams.twitter.collection
+
+import java.nio.file.{Files, Paths}
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.{Monitor, Uninterruptibles}
+import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
+import org.testng.annotations.Test
+
+/**
+ * Created by sblackmon on 3/13/16.
+ */
+class FlinkTwitterPostsPipelineIT extends FlatSpec {
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipelineIT])
+
+ @Test
+ def flinkTwitterPostsPipelineIT = {
+
+ val testConfig : TwitterPostsPipelineConfiguration =
+ new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+ val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
+ source.setPath("target/test-classes")
+ testConfig.setSource(source);
+ val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterPostsPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
+ destination.setPath("target/test-classes")
+ testConfig.setDestination(destination)
+ testConfig.setProviderWaitMs(1000l)
+ testConfig.setTest(true)
+
+ val job = new FlinkTwitterPostsPipeline(config = testConfig)
+ val jobThread = new Thread(job)
+ jobThread.start
+ jobThread.join
+
+ eventually (timeout(30 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterPostsPipeline")))
+ assert(
+ Source.fromFile("target/test-classes/FlinkTwitterPostsPipeline", "UTF-8").getLines.size
+ >= 200)
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
new file mode 100644
index 0000000..3d21244
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -0,0 +1,56 @@
+package com.peoplepattern.streams.twitter.collection
+
+import java.nio.file.{Files, Paths}
+
+import com.peoplepattern.streams.pipelines.pdb.{TwitterPostsPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.scalatest.FlatSpec
+import org.scalatest._
+import org.scalatest.junit.JUnitRunner
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.io.Source
+import org.scalatest.Ignore
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
+import org.testng.annotations.Test
+
+/**
+ * Created by sblackmon on 3/13/16.
+ */
+class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipelineIT])
+
+ @Test
+ def flinkTwitterUserInformationPipelineIT = {
+
+ val testConfig : TwitterUserInformationPipelineConfiguration =
+ new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+ val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("1000twitterids.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
+ source.setPath("target/test-classes")
+ testConfig.setSource(source);
+ val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/TwitterUserInformationPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
+ destination.setPath("target/test-classes")
+ testConfig.setDestination(destination)
+ testConfig.setProviderWaitMs(1000l)
+ testConfig.setTest(true)
+
+ val job = new FlinkTwitterUserInformationPipeline(config = testConfig)
+ val jobThread = new Thread(job)
+ jobThread.start
+ jobThread.join
+
+ eventually (timeout(30 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get("target/test-classes/TwitterUserInformationPipeline")))
+ assert(
+ Source.fromFile("target/test-classes/TwitterUserInformationPipeline", "UTF-8").getLines.size
+ > 500)
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
new file mode 100644
index 0000000..7054e89
--- /dev/null
+++ b/flink/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>streams-examples</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.4-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>streams-examples-flink</artifactId>
+
+ <packaging>pom</packaging>
+ <name>streams-examples-flink</name>
+
+ <description>Contributed examples of use cases for Streams using flink</description>
+
+ <properties>
+
+ </properties>
+
+ <modules>
+ <module>flink-twitter-collection</module>
+ </modules>
+
+ <build>
+
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5d25974..9984a5b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>streams-master</artifactId>
<groupId>org.apache.streams</groupId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</parent>
<artifactId>streams-examples</artifactId>
@@ -172,6 +172,7 @@
</properties>
<modules>
+ <module>flink</module>
<module>local</module>
</modules>
@@ -186,39 +187,39 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-filters</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
@@ -230,32 +231,32 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-runtime-local</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-provider-twitter</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-elasticsearch</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-graph</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-hdfs</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-mongo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
[3/6] incubator-streams-examples git commit: flink examples building
and running
Posted by sb...@apache.org.
flink examples building and running
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/0112a838
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/0112a838
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/0112a838
Branch: refs/heads/master
Commit: 0112a83874bb7f896a4e3964d5fde75e5967afe6
Parents: 4491cfe
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Sep 29 19:15:39 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Sep 29 19:15:39 2016 -0500
----------------------------------------------------------------------
flink/flink-twitter-collection/pom.xml | 127 +++++++++++------
.../TwitterSpritzerPipelineConfiguration.json | 29 ++++
.../FlinkTwitterFollowingPipeline.scala | 11 +-
.../collection/FlinkTwitterPostsPipeline.scala | 22 ++-
.../FlinkTwitterSpritzerPipeline.scala | 138 +++++++++++++++++++
.../FlinkTwitterUserInformationPipeline.scala | 28 ++--
.../TwitterSpritzerPipelineConfiguration.json | 29 ++++
.../FlinkTwitterFollowingPipeline.conf | 10 --
...linkTwitterFollowingPipelineFollowersIT.conf | 16 +++
.../FlinkTwitterFollowingPipelineFriendsIT.conf | 16 +++
.../resources/FlinkTwitterPostsPipeline.conf | 10 --
.../resources/FlinkTwitterPostsPipelineIT.conf | 15 ++
.../FlinkTwitterUserInformationPipeline.conf | 10 --
.../FlinkTwitterUserInformationPipelineIT.conf | 15 ++
.../test/FlinkTwitterFollowingPipelineIT.scala | 71 +++++-----
.../test/FlinkTwitterPostsPipelineIT.scala | 38 ++---
.../test/FlinkTwitterSpritzerPipelineIT.scala | 57 ++++++++
.../FlinkTwitterUserInformationPipelineIT.scala | 33 +++--
flink/pom.xml | 3 -
local/elasticsearch-hdfs/pom.xml | 14 +-
local/elasticsearch-reindex/pom.xml | 2 +-
local/mongo-elasticsearch-sync/pom.xml | 12 +-
local/twitter-follow-graph/pom.xml | 10 +-
local/twitter-history-elasticsearch/pom.xml | 14 +-
local/twitter-userstream-elasticsearch/pom.xml | 14 +-
25 files changed, 543 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/pom.xml
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
index 33b05fe..2d35035 100644
--- a/flink/flink-twitter-collection/pom.xml
+++ b/flink/flink-twitter-collection/pom.xml
@@ -34,17 +34,38 @@
<description>Collects twitter documents using flink.</description>
<properties>
- <docker.repo>apachestreams</docker.repo>
+ <testng.version>6.9.10</testng.version>
<hdfs.version>2.7.0</hdfs.version>
<flink.version>1.1.2</flink.version>
+ <scala.version>2.10.6</scala.version>
+ <scalatest.version>2.2.5</scalatest.version>
<scala.suffix>2.10</scala.suffix>
+ <scala-maven.plugin.version>3.2.2</scala-maven.plugin.version>
</properties>
<dependencies>
<dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-all</artifactId>
- <version>1.3</version>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.suffix}</artifactId>
+ <version>${scalatest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -65,13 +86,11 @@
<groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
<version>${project.version}</version>
- <type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
<version>${project.version}</version>
- <type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
@@ -277,6 +296,19 @@
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>${testng.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
@@ -293,17 +325,6 @@
</testResource>
</testResources>
<plugins>
- <plugin>
- <artifactId>maven-clean-plugin</artifactId>
- <configuration>
- <filesets>
- <fileset>
- <directory>data</directory>
- <followSymlinks>false</followSymlinks>
- </fileset>
- </filesets>
- </configuration>
- </plugin>
<!-- This binary runs with logback -->
<!-- Keep log4j out -->
<plugin>
@@ -334,8 +355,56 @@
</executions>
</plugin>
<plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>${scala-maven.plugin.version}</version>
+ <executions>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.build.finalName}</finalName>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>**/logback.xml</exclude>
+ <exclude>**/log4j.properties</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
</plugin>
<plugin>
<groupId>org.jsonschema2pojo</groupId>
@@ -348,7 +417,6 @@
<sourcePath>src/main/jsonschema</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
- <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
<useJodaDates>false</useJodaDates>
</configuration>
<executions>
@@ -379,25 +447,6 @@
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <configuration>
- <includes>**/*.json</includes>
- <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
- <includeGroupIds>org.apache.streams</includeGroupIds>
- <includeTypes>test-jar</includeTypes>
- </configuration>
- <executions>
- <execution>
- <id>test-resource-dependencies</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>unpack-dependencies</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.12.4</version>
<executions>
@@ -410,10 +459,6 @@
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>io.fabric8</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
new file mode 100644
index 0000000..49d0d1e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 2ac7d32..2fd9336 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -3,6 +3,7 @@ package org.apache.streams.examples.flink.twitter.collection
import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.core.fs.FileSystem
@@ -17,10 +18,10 @@ import org.apache.streams.twitter.TwitterFollowingConfiguration
import org.apache.streams.twitter.pojo.Follow
import org.apache.streams.twitter.provider.TwitterFollowingProvider
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
import org.apache.streams.examples.flink.FlinkBase
import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+import org.apache.flink.api.scala._
import scala.collection.JavaConversions._
@@ -75,6 +76,12 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
return false
}
+ Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
return true
}
@@ -134,7 +141,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
def collectConnections(id : String, out : Collector[StreamsDatum]) = {
val twitProvider: TwitterFollowingProvider =
new TwitterFollowingProvider(
- twitterConfiguration.withIdsOnly(true).withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
+ twitterConfiguration.withIdsOnly(true).withInfo(List(toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
)
twitProvider.prepare(twitProvider)
twitProvider.startStream()
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index f8e221c..beea973 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -3,11 +3,8 @@ package org.apache.streams.examples.flink.twitter.collection
import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
-import com.peoplepattern.streams.pdb.pipelines.FlinkStreamingConfiguration
-import com.peoplepattern.streams.pdb.flink.{FlinkBase, FlinkUtil}
-import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
-import com.peoplepattern.streams.twitter.collection.FlinkTwitterPostsPipeline.LOGGER
import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
@@ -33,6 +30,7 @@ import org.apache.streams.twitter.TwitterUserInformationConfiguration
import org.apache.streams.twitter.pojo.{Tweet, User}
import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
import scala.collection.JavaConversions._
@@ -84,6 +82,12 @@ object FlinkTwitterPostsPipeline extends FlinkBase {
return false
}
+ Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
return true
}
@@ -105,16 +109,8 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
val outPath = buildWriterPath(config.getDestination)
- //val inProps = buildKafkaProps(config.getSourceTopic)
-
val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
- //val idTopicIn = new KafkaSink()
-
-// val idTopicOut : DataStream[String] = env.addSource[String](
-// new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09(config.getSourceTopic.getTopic, new SimpleStringSchema(),
-// inProps));
-
val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
// these datums contain 'Tweet' objects
@@ -149,7 +145,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
val twitterConfiguration = config.getTwitter
val twitProvider: TwitterTimelineProvider =
new TwitterTimelineProvider(
- twitterConfiguration.withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(200l)
+ twitterConfiguration.withInfo(List(toProviderId(id))).withMaxItems(200l)
)
twitProvider.prepare(twitProvider)
twitProvider.startStream()
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
new file mode 100644
index 0000000..b615806
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -0,0 +1,138 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterStreamConfiguration
+import org.apache.streams.twitter.provider.TwitterStreamProvider
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
+
+import scala.collection.JavaConversions._
+
+/**
+ * Created by sblackmon on 7/29/15.
+ */
+object FlinkTwitterSpritzerPipeline extends FlinkBase {
+
+ val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline"
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipeline])
+ private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+ override def main(args: Array[String]) = {
+ super.main(args)
+ val jobConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe)
+ if( setup(jobConfig) == false ) System.exit(1)
+ val pipeline: FlinkTwitterSpritzerPipeline = new FlinkTwitterSpritzerPipeline(jobConfig)
+ val thread = new Thread(pipeline)
+ thread.start()
+ thread.join()
+ }
+
+ def setup(jobConfig: TwitterSpritzerPipelineConfiguration): Boolean = {
+
+ LOGGER.info("TwitterSpritzerPipelineConfiguration: " + jobConfig)
+
+ if( jobConfig == null ) {
+ LOGGER.error("jobConfig is null!")
+ System.err.println("jobConfig is null!")
+ return false
+ }
+
+ if( jobConfig.getDestination == null ) {
+ LOGGER.error("jobConfig.getDestination is null!")
+ System.err.println("jobConfig.getDestination is null!")
+ return false
+ }
+
+ if( jobConfig.getTwitter == null ) {
+ LOGGER.error("jobConfig.getTwitter is null!")
+ System.err.println("jobConfig.getTwitter is null!")
+ return false
+ }
+
+ Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
+ return true
+
+ }
+
+}
+
+class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+ import FlinkTwitterSpritzerPipeline._
+
+ override def run(): Unit = {
+
+ val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setNumberOfExecutionRetries(0)
+
+ val outPath = buildWriterPath(config.getDestination)
+
+ val streamSource : DataStream[String] = env.addSource(new SpritzerSource(config.getTwitter));
+
+ if( config.getTest == false )
+ streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+ else
+ streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(env.getParallelism);
+
+ // if( test == true ) jsons.print();
+
+ env.execute("FlinkTwitterPostsPipeline")
+ }
+
+ class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable {
+
+ var twitProvider: TwitterStreamProvider = _
+
+ @throws[Exception]
+ override def open(parameters: Configuration): Unit = {
+ twitProvider = new TwitterStreamProvider( sourceConfig )
+ twitProvider.prepare(twitProvider)
+ twitProvider.startStream()
+ }
+
+ override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
+ var iterator: Iterator[StreamsDatum] = null
+ do {
+ Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ iterator = twitProvider.readCurrent().iterator()
+ iterator.toList.map(datum => ctx.collect(datum.getDocument.asInstanceOf[String]))
+ } while( twitProvider.isRunning )
+ }
+
+ override def cancel(): Unit = {
+ twitProvider.cleanUp()
+ }
+
+ @throws[Exception]
+ override def close(): Unit = {
+ twitProvider.cleanUp()
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index a081c74..867255d 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -1,28 +1,18 @@
package org.apache.streams.examples.flink.twitter.collection
-import java.lang
import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
-
-import scala.collection.JavaConversions._
-import com.peoplepattern.streams.twitter.collection.FlinkTwitterUserInformationPipeline.LOGGER
import com.google.common.util.concurrent.Uninterruptibles
-import org.apache.streams.examples.flink.FlinkBase
import org.apache.flink.api.common.functions.RichFlatMapFunction
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers._
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.connectors.fs.RollingSink
import org.apache.flink.util.Collector
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
@@ -30,13 +20,13 @@ import org.apache.streams.core.StreamsDatum
import org.apache.streams.examples.flink.FlinkBase
import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.HdfsConfiguration
import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.TwitterUserInformationConfiguration
-import org.apache.streams.twitter.pojo.{Tweet, User}
-import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.apache.streams.twitter.pojo.User
+import org.apache.streams.twitter.provider.TwitterUserInformationProvider
import org.slf4j.{Logger, LoggerFactory}
+import scala.collection.JavaConversions._
+
/**
* Created by sblackmon on 3/15/16.
*/
@@ -85,6 +75,12 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
return false
}
+ Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
return true
}
@@ -137,7 +133,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 )
- out.collect(input.map(id => FlinkUtil.toProviderId(id)).toList)
+ out.collect(input.map(id => toProviderId(id)).toList)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
new file mode 100644
index 0000000..49d0d1e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
deleted file mode 100644
index e74f00c..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
- endpoint = followers
- version = 1.1
- oauth {
- consumerKey = ""
- consumerSecret = ""
- accessToken = ""
- accessTokenSecret = ""
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
new file mode 100644
index 0000000..87057be
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -0,0 +1,16 @@
+source {
+ fields = ["ID"]
+ scheme = file
+ path = "target/test-classes"
+ readerPath = "asf.txt"
+}
+destination {
+ fields = ["DOC"]
+ scheme = file
+ path = "target/test-classes"
+ writerPath = "FlinkTwitterFollowingPipelineFollowersIT"
+}
+twitter.endpoint = friends
+providerWaitMs = 1000
+local = true
+test = true
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
new file mode 100644
index 0000000..b5212ed
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -0,0 +1,16 @@
+source {
+ fields = ["ID"]
+ scheme = file
+ path = "target/test-classes"
+ readerPath = "asf.txt"
+}
+destination {
+ fields = ["DOC"]
+ scheme = file
+ path = "target/test-classes"
+ writerPath = "FlinkTwitterFollowingPipelineFriendsIT"
+}
+twitter.endpoint = friends
+providerWaitMs = 1000
+local = true
+test = true
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
deleted file mode 100644
index 63a6481..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
- version = 1.1
- endpoint = statuses
- oauth {
- consumerKey = ""
- consumerSecret = ""
- accessToken = ""
- accessTokenSecret = ""
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
new file mode 100644
index 0000000..6954113
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
@@ -0,0 +1,15 @@
+source {
+ fields = ["ID"]
+ scheme = file
+ path = "target/test-classes"
+ readerPath = "asf.txt"
+}
+destination {
+ fields = ["DOC"]
+ scheme = file
+ path = "target/test-classes"
+ writerPath = "FlinkTwitterPostsPipelineIT"
+}
+providerWaitMs = 1000
+local = true
+test = true
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
deleted file mode 100644
index 6e0a879..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
- version = 1.1
- endpoint = users
- oauth {
- consumerKey = ""
- consumerSecret = ""
- accessToken = ""
- accessTokenSecret = ""
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
new file mode 100644
index 0000000..342a850
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -0,0 +1,15 @@
+source {
+ fields = ["ID"]
+ scheme = file
+ path = "target/test-classes"
+ readerPath = "asf.txt"
+}
+destination {
+ fields = ["DOC"]
+ scheme = file
+ path = "target/test-classes"
+ writerPath = "FlinkTwitterUserInformationPipelineIT"
+}
+providerWaitMs = 1000
+local = true
+test = true
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
index aa2b1a9..b051e90 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
@@ -1,17 +1,22 @@
package com.peoplepattern.streams.twitter.collection
+import java.io.File
import java.nio.file.{Files, Paths}
-import com.peoplepattern.streams.pipelines.pdb.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.SpanSugar._
import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
/**
* Created by sblackmon on 3/13/16.
@@ -20,30 +25,31 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT])
+ import FlinkTwitterFollowingPipeline._
+
@Test
def flinkTwitterFollowersPipelineFriendsIT = {
- val testConfig : TwitterFollowingPipelineConfiguration =
- new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
- testConfig.getTwitter.setEndpoint("friends")
- val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
- source.setPath("target/test-classes")
- testConfig.setSource(source);
- val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
- destination.setPath("target/test-classes")
- testConfig.setDestination(destination)
- testConfig.setProviderWaitMs(1000l)
- testConfig.setTest(true)
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
val job = new FlinkTwitterFollowingPipeline(config = testConfig)
val jobThread = new Thread(job)
jobThread.start
jobThread.join
- eventually (timeout(30 seconds), interval(1 seconds)) {
- assert(Files.exists(Paths.get("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends")))
+ eventually (timeout(60 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
- Source.fromFile("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends", "UTF-8").getLines.size
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
> 90)
}
@@ -52,27 +58,26 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
@Test
def flinkTwitterFollowersPipelineFollowersIT = {
- val testConfig : TwitterFollowingPipelineConfiguration =
- new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
- testConfig.getTwitter.setEndpoint("followers")
- val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
- source.setPath("target/test-classes")
- testConfig.setSource(source);
- val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/followers").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
- destination.setPath("target/test-classes")
- testConfig.setDestination(destination)
- testConfig.setProviderWaitMs(1000l)
- testConfig.setTest(true)
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
val job = new FlinkTwitterFollowingPipeline(config = testConfig)
val jobThread = new Thread(job)
jobThread.start
jobThread.join
- eventually (timeout(30 seconds), interval(1 seconds)) {
- assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterFollowingPipeline/followers")))
+ eventually (timeout(60 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
- Source.fromFile("target/test-classes/FlinkTwitterFollowingPipeline/followers", "UTF-8").getLines.size
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
> 500)
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index 8a942e5..a355696 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -1,12 +1,16 @@
package com.peoplepattern.streams.twitter.collection
+import java.io.File
import java.nio.file.{Files, Paths}
import java.util.concurrent.TimeUnit
import com.google.common.util.concurrent.{Monitor, Uninterruptibles}
-import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.examples.flink.twitter.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterPostsPipeline, FlinkTwitterUserInformationPipeline}
import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
import org.slf4j.{Logger, LoggerFactory}
@@ -20,23 +24,25 @@ import org.testng.annotations.Test
/**
* Created by sblackmon on 3/13/16.
*/
-class FlinkTwitterPostsPipelineIT extends FlatSpec {
+class FlinkTwitterPostsPipelineIT extends FlatSpec {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipelineIT])
+ import FlinkTwitterPostsPipeline._
+
@Test
def flinkTwitterPostsPipelineIT = {
- val testConfig : TwitterPostsPipelineConfiguration =
- new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
- val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
- source.setPath("target/test-classes")
- testConfig.setSource(source);
- val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterPostsPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
- destination.setPath("target/test-classes")
- testConfig.setDestination(destination)
- testConfig.setProviderWaitMs(1000l)
- testConfig.setTest(true)
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterPostsPipelineIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
val job = new FlinkTwitterPostsPipeline(config = testConfig)
val jobThread = new Thread(job)
@@ -44,9 +50,9 @@ class FlinkTwitterPostsPipelineIT extends FlatSpec {
jobThread.join
eventually (timeout(30 seconds), interval(1 seconds)) {
- assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterPostsPipeline")))
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
- Source.fromFile("target/test-classes/FlinkTwitterPostsPipeline", "UTF-8").getLines.size
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
>= 200)
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
new file mode 100644
index 0000000..f083f65
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -0,0 +1,57 @@
+package org.apache.streams.examples.flink.twitter.test
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterSpritzerPipeline, FlinkTwitterUserInformationPipeline}
+import org.apache.streams.examples.flink.twitter.{TwitterPostsPipelineConfiguration, TwitterSpritzerPipelineConfiguration}
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
+
+/**
+ * Created by sblackmon on 3/13/16.
+ */
+class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterSpritzerPipelineIT])
+
+ import FlinkTwitterSpritzerPipeline._
+
+ @Test(enabled = false)
+ def flinkTwitterSpritzerPipelineIT = {
+
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterSpritzerPipelineIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
+
+ val job = new FlinkTwitterSpritzerPipeline(config = testConfig)
+ val jobThread = new Thread(job)
+ jobThread.start
+ jobThread.join
+
+ eventually (timeout(30 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
+ assert(
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+ >= 200)
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index 3d21244..2ca8650 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -1,10 +1,13 @@
package com.peoplepattern.streams.twitter.collection
+import java.io.File
import java.nio.file.{Files, Paths}
-import com.peoplepattern.streams.pipelines.pdb.{TwitterPostsPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.examples.flink.twitter.{TwitterSpritzerPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
import org.scalatest.FlatSpec
import org.scalatest._
@@ -25,19 +28,21 @@ class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipelineIT])
+ import FlinkTwitterUserInformationPipeline._
+
@Test
def flinkTwitterUserInformationPipelineIT = {
- val testConfig : TwitterUserInformationPipelineConfiguration =
- new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
- val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("1000twitterids.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
- source.setPath("target/test-classes")
- testConfig.setSource(source);
- val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/TwitterUserInformationPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
- destination.setPath("target/test-classes")
- testConfig.setDestination(destination)
- testConfig.setProviderWaitMs(1000l)
- testConfig.setTest(true)
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterUserInformationPipelineIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
val job = new FlinkTwitterUserInformationPipeline(config = testConfig)
val jobThread = new Thread(job)
@@ -45,9 +50,9 @@ class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
jobThread.join
eventually (timeout(30 seconds), interval(1 seconds)) {
- assert(Files.exists(Paths.get("target/test-classes/TwitterUserInformationPipeline")))
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
- Source.fromFile("target/test-classes/TwitterUserInformationPipeline", "UTF-8").getLines.size
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
> 500)
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 7054e89..6c50ca2 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -41,7 +41,4 @@
<module>flink-twitter-collection</module>
</modules>
- <build>
-
- </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/elasticsearch-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/pom.xml b/local/elasticsearch-hdfs/pom.xml
index 7b653fc..52cd0fc 100644
--- a/local/elasticsearch-hdfs/pom.xml
+++ b/local/elasticsearch-hdfs/pom.xml
@@ -67,7 +67,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
@@ -76,34 +76,34 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-runtime-local</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-elasticsearch</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-hdfs</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/elasticsearch-reindex/pom.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/pom.xml b/local/elasticsearch-reindex/pom.xml
index e81cbe2..325e564 100644
--- a/local/elasticsearch-reindex/pom.xml
+++ b/local/elasticsearch-reindex/pom.xml
@@ -92,7 +92,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-elasticsearch</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/mongo-elasticsearch-sync/pom.xml
----------------------------------------------------------------------
diff --git a/local/mongo-elasticsearch-sync/pom.xml b/local/mongo-elasticsearch-sync/pom.xml
index 318c47e..d268ed7 100644
--- a/local/mongo-elasticsearch-sync/pom.xml
+++ b/local/mongo-elasticsearch-sync/pom.xml
@@ -66,7 +66,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
@@ -75,27 +75,27 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-runtime-local</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-elasticsearch</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-mongo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-follow-graph/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-follow-graph/pom.xml b/local/twitter-follow-graph/pom.xml
index d40adde..9bf980d 100644
--- a/local/twitter-follow-graph/pom.xml
+++ b/local/twitter-follow-graph/pom.xml
@@ -49,17 +49,17 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-runtime-local</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-provider-twitter</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
@@ -70,12 +70,12 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-graph</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-history-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/pom.xml b/local/twitter-history-elasticsearch/pom.xml
index afc8cf0..ba6dbe7 100644
--- a/local/twitter-history-elasticsearch/pom.xml
+++ b/local/twitter-history-elasticsearch/pom.xml
@@ -69,7 +69,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
@@ -78,29 +78,29 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-runtime-local</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-provider-twitter</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
@@ -111,7 +111,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-elasticsearch</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-userstream-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/pom.xml b/local/twitter-userstream-elasticsearch/pom.xml
index 224bdd4..1b7b64f 100644
--- a/local/twitter-userstream-elasticsearch/pom.xml
+++ b/local/twitter-userstream-elasticsearch/pom.xml
@@ -67,7 +67,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
@@ -76,32 +76,32 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-runtime-local</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-filters</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-provider-twitter</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-elasticsearch</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
[4/6] incubator-streams-examples git commit: improve documentation
Posted by sb...@apache.org.
improve documentation
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/58fefc07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/58fefc07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/58fefc07
Branch: refs/heads/master
Commit: 58fefc07fc09b45f0e4cfebc9b126cab1fa8a9a3
Parents: 0112a83
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Sep 29 22:02:14 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Sep 29 22:02:14 2016 -0500
----------------------------------------------------------------------
.../resources/FlinkTwitterFollowingPipeline.dot | 37 ++++++++++++
.../FlinkTwitterFollowingPipeline.dot.svg | 63 ++++++++++++++++++++
.../resources/FlinkTwitterPostsPipeline.dot | 37 ++++++++++++
.../resources/FlinkTwitterPostsPipeline.dot.svg | 63 ++++++++++++++++++++
.../resources/FlinkTwitterSpritzerPipeline.dot | 33 ++++++++++
.../FlinkTwitterSpritzerPipeline.dot.svg | 47 +++++++++++++++
.../FlinkTwitterUserInformationPipeline.dot | 37 ++++++++++++
.../FlinkTwitterUserInformationPipeline.dot.svg | 63 ++++++++++++++++++++
.../markdown/FlinkTwitterFollowingPipeline.md | 4 +-
.../site/markdown/FlinkTwitterPostsPipeline.md | 2 +-
.../markdown/FlinkTwitterSpritzerPipeline.md | 41 +++++++++++++
.../FlinkTwitterUserInformationPipeline.md | 4 +-
.../src/site/markdown/index.md | 16 ++++-
.../test/FlinkTwitterFollowingPipelineIT.scala | 4 +-
14 files changed, 444 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot
new file mode 100644
index 0000000..ba5e60d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //source
+ source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+ //providers
+ TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+ //stream
+ TwitterFollowingProvider -> source [dir=back,style=dashed];
+ TwitterFollowingProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot.svg
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot.svg b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot.svg
new file mode 100644
index 0000000..79bee38
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot.svg
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
+ "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<!-- Generated by graphviz version 2.39.20160214.2135 (20160214.2135)
+ -->
+<!-- Title: g Pages: 1 -->
+<svg width="504pt" height="203pt"
+ viewBox="0.00 0.00 504.29 203.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
+<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 199)">
+<title>g</title>
+<polygon fill="white" stroke="transparent" points="-4,4 -4,-199 500.286,-199 500.286,4 -4,4"/>
+<!-- source -->
+<g id="node1" class="node">
+<title>source</title>
+<polygon fill="none" stroke="black" points="275.082,-109 11.9727,-109 11.9727,-113 -0.0273492,-113 -0.0273492,-73 275.082,-73 275.082,-109"/>
+<polyline fill="none" stroke="black" points="-0.0273492,-109 11.9727,-109 "/>
+<text text-anchor="middle" x="137.527" y="-93.8" font-family="Times,serif" font-size="14.00">source</text>
+<text text-anchor="middle" x="137.527" y="-79.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${readerPath}</text>
+</g>
+<!-- TwitterFollowingProvider -->
+<g id="node2" class="node">
+<title>TwitterFollowingProvider</title>
+<g id="a_node2"><a xlink:href="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java" xlink:title="TwitterFollowingProvider">
+<ellipse fill="none" stroke="black" cx="200.527" cy="-177" rx="102.174" ry="18"/>
+<text text-anchor="middle" x="200.527" y="-172.8" font-family="Times,serif" font-size="14.00">TwitterFollowingProvider</text>
+</a>
+</g>
+</g>
+<!-- TwitterFollowingProvider->source -->
+<g id="edge1" class="edge">
+<title>TwitterFollowingProvider->source</title>
+<path fill="none" stroke="black" stroke-dasharray="5,2" d="M181.043,-150.402C171.25,-137.034 159.635,-121.178 150.869,-109.212"/>
+<polygon fill="black" stroke="black" points="178.434,-152.763 187.167,-158.762 184.081,-148.626 178.434,-152.763"/>
+</g>
+<!-- RollingFileSink -->
+<g id="node3" class="node">
+<title>RollingFileSink</title>
+<ellipse fill="none" stroke="black" cx="359.527" cy="-91" rx="66.7358" ry="18"/>
+<text text-anchor="middle" x="359.527" y="-86.8" font-family="Times,serif" font-size="14.00">RollingFileSink</text>
+</g>
+<!-- TwitterFollowingProvider->RollingFileSink -->
+<g id="edge2" class="edge">
+<title>TwitterFollowingProvider->RollingFileSink</title>
+<path fill="none" stroke="black" d="M232.321,-159.803C257.806,-146.019 293.67,-126.621 320.824,-111.934"/>
+<polygon fill="black" stroke="black" points="322.54,-114.985 329.671,-107.149 319.21,-108.828 322.54,-114.985"/>
+<text text-anchor="middle" x="306.641" y="-129.8" font-family="Times,serif" font-size="14.00">String</text>
+</g>
+<!-- destination -->
+<g id="node4" class="node">
+<title>destination</title>
+<polygon fill="none" stroke="black" points="496.044,-36 235.01,-36 235.01,-40 223.01,-40 223.01,-0 496.044,-0 496.044,-36"/>
+<polyline fill="none" stroke="black" points="223.01,-36 235.01,-36 "/>
+<text text-anchor="middle" x="359.527" y="-20.8" font-family="Times,serif" font-size="14.00">destination</text>
+<text text-anchor="middle" x="359.527" y="-6.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${writerPath}</text>
+</g>
+<!-- RollingFileSink->destination -->
+<g id="edge3" class="edge">
+<title>RollingFileSink->destination</title>
+<path fill="none" stroke="black" d="M359.527,-72.9551C359.527,-64.8828 359.527,-55.1764 359.527,-46.1817"/>
+<polygon fill="black" stroke="black" points="363.027,-46.0903 359.527,-36.0904 356.027,-46.0904 363.027,-46.0903"/>
+</g>
+</g>
+</svg>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot
new file mode 100644
index 0000000..1092ff4
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //source
+ source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+ //providers
+ TwitterTimelineProvider [label="TwitterTimelineProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+ //stream
+ TwitterTimelineProvider -> source [dir=back,style=dashed];
+ TwitterTimelineProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot.svg
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot.svg b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot.svg
new file mode 100644
index 0000000..5698c45
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot.svg
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
+ "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<!-- Generated by graphviz version 2.39.20160214.2135 (20160214.2135)
+ -->
+<!-- Title: g Pages: 1 -->
+<svg width="504pt" height="203pt"
+ viewBox="0.00 0.00 504.29 203.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
+<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 199)">
+<title>g</title>
+<polygon fill="white" stroke="transparent" points="-4,4 -4,-199 500.286,-199 500.286,4 -4,4"/>
+<!-- source -->
+<g id="node1" class="node">
+<title>source</title>
+<polygon fill="none" stroke="black" points="275.082,-109 11.9727,-109 11.9727,-113 -0.0273492,-113 -0.0273492,-73 275.082,-73 275.082,-109"/>
+<polyline fill="none" stroke="black" points="-0.0273492,-109 11.9727,-109 "/>
+<text text-anchor="middle" x="137.527" y="-93.8" font-family="Times,serif" font-size="14.00">source</text>
+<text text-anchor="middle" x="137.527" y="-79.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${readerPath}</text>
+</g>
+<!-- TwitterTimelineProvider -->
+<g id="node2" class="node">
+<title>TwitterTimelineProvider</title>
+<g id="a_node2"><a xlink:href="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java" xlink:title="TwitterTimelineProvider">
+<ellipse fill="none" stroke="black" cx="200.527" cy="-177" rx="97.6831" ry="18"/>
+<text text-anchor="middle" x="200.527" y="-172.8" font-family="Times,serif" font-size="14.00">TwitterTimelineProvider</text>
+</a>
+</g>
+</g>
+<!-- TwitterTimelineProvider->source -->
+<g id="edge1" class="edge">
+<title>TwitterTimelineProvider->source</title>
+<path fill="none" stroke="black" stroke-dasharray="5,2" d="M181.043,-150.402C171.25,-137.034 159.635,-121.178 150.869,-109.212"/>
+<polygon fill="black" stroke="black" points="178.434,-152.763 187.167,-158.762 184.081,-148.626 178.434,-152.763"/>
+</g>
+<!-- RollingFileSink -->
+<g id="node3" class="node">
+<title>RollingFileSink</title>
+<ellipse fill="none" stroke="black" cx="359.527" cy="-91" rx="66.7358" ry="18"/>
+<text text-anchor="middle" x="359.527" y="-86.8" font-family="Times,serif" font-size="14.00">RollingFileSink</text>
+</g>
+<!-- TwitterTimelineProvider->RollingFileSink -->
+<g id="edge2" class="edge">
+<title>TwitterTimelineProvider->RollingFileSink</title>
+<path fill="none" stroke="black" d="M232.321,-159.803C257.806,-146.019 293.67,-126.621 320.824,-111.934"/>
+<polygon fill="black" stroke="black" points="322.54,-114.985 329.671,-107.149 319.21,-108.828 322.54,-114.985"/>
+<text text-anchor="middle" x="306.641" y="-129.8" font-family="Times,serif" font-size="14.00">String</text>
+</g>
+<!-- destination -->
+<g id="node4" class="node">
+<title>destination</title>
+<polygon fill="none" stroke="black" points="496.044,-36 235.01,-36 235.01,-40 223.01,-40 223.01,-0 496.044,-0 496.044,-36"/>
+<polyline fill="none" stroke="black" points="223.01,-36 235.01,-36 "/>
+<text text-anchor="middle" x="359.527" y="-20.8" font-family="Times,serif" font-size="14.00">destination</text>
+<text text-anchor="middle" x="359.527" y="-6.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${writerPath}</text>
+</g>
+<!-- RollingFileSink->destination -->
+<g id="edge3" class="edge">
+<title>RollingFileSink->destination</title>
+<path fill="none" stroke="black" d="M359.527,-72.9551C359.527,-64.8828 359.527,-55.1764 359.527,-46.1817"/>
+<polygon fill="black" stroke="black" points="363.027,-46.0903 359.527,-36.0904 356.027,-46.0904 363.027,-46.0903"/>
+</g>
+</g>
+</svg>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot
new file mode 100644
index 0000000..5a57595
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //providers
+ TwitterStreamProvider [label="TwitterStreamProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="hdfs://${host}:${port}/${path}/${writerPath}",shape=box];
+
+ //stream
+ TwitterStreamProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot.svg
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot.svg b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot.svg
new file mode 100644
index 0000000..960a11f
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot.svg
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
+ "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<!-- Generated by graphviz version 2.39.20160214.2135 (20160214.2135)
+ -->
+<!-- Title: g Pages: 1 -->
+<svg width="282pt" height="203pt"
+ viewBox="0.00 0.00 281.52 203.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
+<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 199)">
+<title>g</title>
+<polygon fill="white" stroke="transparent" points="-4,4 -4,-199 277.517,-199 277.517,4 -4,4"/>
+<!-- TwitterStreamProvider -->
+<g id="node1" class="node">
+<title>TwitterStreamProvider</title>
+<g id="a_node1"><a xlink:href="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java" xlink:title="TwitterStreamProvider">
+<ellipse fill="none" stroke="black" cx="136.758" cy="-177" rx="91.0473" ry="18"/>
+<text text-anchor="middle" x="136.758" y="-172.8" font-family="Times,serif" font-size="14.00">TwitterStreamProvider</text>
+</a>
+</g>
+</g>
+<!-- RollingFileSink -->
+<g id="node2" class="node">
+<title>RollingFileSink</title>
+<ellipse fill="none" stroke="black" cx="136.758" cy="-91" rx="66.7358" ry="18"/>
+<text text-anchor="middle" x="136.758" y="-86.8" font-family="Times,serif" font-size="14.00">RollingFileSink</text>
+</g>
+<!-- TwitterStreamProvider->RollingFileSink -->
+<g id="edge1" class="edge">
+<title>TwitterStreamProvider->RollingFileSink</title>
+<path fill="none" stroke="black" d="M136.758,-158.762C136.758,-147.36 136.758,-132.434 136.758,-119.494"/>
+<polygon fill="black" stroke="black" points="140.258,-119.212 136.758,-109.212 133.258,-119.212 140.258,-119.212"/>
+<text text-anchor="middle" x="153.872" y="-129.8" font-family="Times,serif" font-size="14.00">String</text>
+</g>
+<!-- destination -->
+<g id="node3" class="node">
+<title>destination</title>
+<polygon fill="none" stroke="black" points="273.275,-36 0.241273,-36 0.241273,-0 273.275,-0 273.275,-36"/>
+<text text-anchor="middle" x="136.758" y="-13.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${writerPath}</text>
+</g>
+<!-- RollingFileSink->destination -->
+<g id="edge2" class="edge">
+<title>RollingFileSink->destination</title>
+<path fill="none" stroke="black" d="M136.758,-72.9551C136.758,-64.8828 136.758,-55.1764 136.758,-46.1817"/>
+<polygon fill="black" stroke="black" points="140.258,-46.0903 136.758,-36.0904 133.258,-46.0904 140.258,-46.0903"/>
+</g>
+</g>
+</svg>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot
new file mode 100644
index 0000000..4a37234
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //source
+ source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+ //providers
+ TwitterUserInformationProvider [label="TwitterUserInformationProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+ //stream
+ TwitterUserInformationProvider -> source [dir=back,style=dashed];
+ TwitterUserInformationProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot.svg
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot.svg b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot.svg
new file mode 100644
index 0000000..9dadc63
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot.svg
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
+ "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<!-- Generated by graphviz version 2.39.20160214.2135 (20160214.2135)
+ -->
+<!-- Title: g Pages: 1 -->
+<svg width="504pt" height="203pt"
+ viewBox="0.00 0.00 504.29 203.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
+<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 199)">
+<title>g</title>
+<polygon fill="white" stroke="transparent" points="-4,4 -4,-199 500.286,-199 500.286,4 -4,4"/>
+<!-- source -->
+<g id="node1" class="node">
+<title>source</title>
+<polygon fill="none" stroke="black" points="275.082,-109 11.9727,-109 11.9727,-113 -0.0273492,-113 -0.0273492,-73 275.082,-73 275.082,-109"/>
+<polyline fill="none" stroke="black" points="-0.0273492,-109 11.9727,-109 "/>
+<text text-anchor="middle" x="137.527" y="-93.8" font-family="Times,serif" font-size="14.00">source</text>
+<text text-anchor="middle" x="137.527" y="-79.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${readerPath}</text>
+</g>
+<!-- TwitterUserInformationProvider -->
+<g id="node2" class="node">
+<title>TwitterUserInformationProvider</title>
+<g id="a_node2"><a xlink:href="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java" xlink:title="TwitterUserInformationProvider">
+<ellipse fill="none" stroke="black" cx="200.527" cy="-177" rx="124.824" ry="18"/>
+<text text-anchor="middle" x="200.527" y="-172.8" font-family="Times,serif" font-size="14.00">TwitterUserInformationProvider</text>
+</a>
+</g>
+</g>
+<!-- TwitterUserInformationProvider->source -->
+<g id="edge1" class="edge">
+<title>TwitterUserInformationProvider->source</title>
+<path fill="none" stroke="black" stroke-dasharray="5,2" d="M181.043,-150.402C171.25,-137.034 159.635,-121.178 150.869,-109.212"/>
+<polygon fill="black" stroke="black" points="178.434,-152.763 187.167,-158.762 184.081,-148.626 178.434,-152.763"/>
+</g>
+<!-- RollingFileSink -->
+<g id="node3" class="node">
+<title>RollingFileSink</title>
+<ellipse fill="none" stroke="black" cx="359.527" cy="-91" rx="66.7358" ry="18"/>
+<text text-anchor="middle" x="359.527" y="-86.8" font-family="Times,serif" font-size="14.00">RollingFileSink</text>
+</g>
+<!-- TwitterUserInformationProvider->RollingFileSink -->
+<g id="edge2" class="edge">
+<title>TwitterUserInformationProvider->RollingFileSink</title>
+<path fill="none" stroke="black" d="M232.703,-159.597C258.14,-145.839 293.738,-126.584 320.747,-111.975"/>
+<polygon fill="black" stroke="black" points="322.42,-115.05 329.551,-107.214 319.09,-108.893 322.42,-115.05"/>
+<text text-anchor="middle" x="306.641" y="-129.8" font-family="Times,serif" font-size="14.00">String</text>
+</g>
+<!-- destination -->
+<g id="node4" class="node">
+<title>destination</title>
+<polygon fill="none" stroke="black" points="496.044,-36 235.01,-36 235.01,-40 223.01,-40 223.01,-0 496.044,-0 496.044,-36"/>
+<polyline fill="none" stroke="black" points="223.01,-36 235.01,-36 "/>
+<text text-anchor="middle" x="359.527" y="-20.8" font-family="Times,serif" font-size="14.00">destination</text>
+<text text-anchor="middle" x="359.527" y="-6.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${writerPath}</text>
+</g>
+<!-- RollingFileSink->destination -->
+<g id="edge3" class="edge">
+<title>RollingFileSink->destination</title>
+<path fill="none" stroke="black" d="M359.527,-72.9551C359.527,-64.8828 359.527,-55.1764 359.527,-46.1817"/>
+<polygon fill="black" stroke="black" points="363.027,-46.0903 359.527,-36.0904 356.027,-46.0904 363.027,-46.0903"/>
+</g>
+</g>
+</svg>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
index 22f30f5..3ad23d3 100644
--- a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
@@ -19,7 +19,9 @@ Diagram:
Example Configuration:
----------------------
-[FlinkTwitterFollowingPipeline.json](FlinkTwitterFollowingPipeline.json "FlinkTwitterFollowingPipeline.json" )
+[FlinkTwitterFollowingPipelineFollowersIT.conf](FlinkTwitterFollowingPipelineFollowersIT.conf "FlinkTwitterFollowingPipelineFollowersIT.conf" )
+
+[FlinkTwitterFollowingPipelineFriendsIT.conf](FlinkTwitterFollowingPipelineFriendsIT.conf "FlinkTwitterFollowingPipelineFriendsIT.conf" )
Run (Local):
------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
index 5f77994..fe6b544 100644
--- a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
@@ -19,7 +19,7 @@ Diagram:
Example Configuration:
----------------------
-[FlinkTwitterPostsPipeline.json](FlinkTwitterPostsPipeline.json "FlinkTwitterPostsPipeline.json" )
+[FlinkTwitterPostsPipelineIT.conf](FlinkTwitterPostsPipelineIT.conf "FlinkTwitterPostsPipelineIT.conf" )
Run (Local):
------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
new file mode 100644
index 0000000..259fe7f
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterSpritzerPipeline
+============================
+
+Description:
+-----------------
+
+Collects twitter posts in real-time from the sample endpoint with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterSpritzerPipeline.dot](FlinkTwitterSpritzerPipeline.dot "FlinkTwitterSpritzerPipeline.dot" )
+
+Diagram:
+-----------------
+
+![FlinkTwitterSpritzerPipeline.dot.svg](./FlinkTwitterSpritzerPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterSpritzerPipelineIT.conf](FlinkTwitterSpritzerPipelineIT.conf "FlinkTwitterSpritzerPipelineIT.conf" )
+
+Run (Local):
+------------
+
+ java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
+
+Run (Flink):
+------------
+
+ flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file>
+
+Run (YARN):
+-----------
+
+ flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file>
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
index 5e0d1fe..a465de9 100644
--- a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
@@ -4,7 +4,7 @@ FlinkTwitterUserInformationPipeline
Description:
-----------------
-Collects twitter users with flink.
+Collects twitter user profiles with flink.
Specification:
-----------------
@@ -19,7 +19,7 @@ Diagram:
Example Configuration:
----------------------
-[FlinkTwitterUserInformationPipeline.json](FlinkTwitterUserInformationPipeline.json "FlinkTwitterUserInformationPipeline.json" )
+[FlinkTwitterUserInformationPipelineIT.conf](FlinkTwitterUserInformationPipelineIT.conf "FlinkTwitterUserInformationPipelineIT.conf" )
Run (Local):
------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md
index 19e44cf..0f15603 100644
--- a/flink/flink-twitter-collection/src/site/markdown/index.md
+++ b/flink/flink-twitter-collection/src/site/markdown/index.md
@@ -22,10 +22,24 @@ Streams:
<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
+Test:
+-----
+
+Create a local file `application.conf` with valid twitter credentials
+
+ twitter {
+ oauth {
+ consumerKey = ""
+ consumerSecret = ""
+ accessToken = ""
+ accessTokenSecret = ""
+ }
+ }
+
Build:
---------
- mvn clean install verify
+ mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/application.conf"
[JavaDocs](apidocs/index.html "JavaDocs")
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
index b051e90..e6294f6 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
@@ -27,7 +27,7 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
import FlinkTwitterFollowingPipeline._
- @Test
+ @Test(enabled = false)
def flinkTwitterFollowersPipelineFriendsIT = {
val reference: Config = ConfigFactory.load()
@@ -55,7 +55,7 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
}
- @Test
+ @Test(enabled = false)
def flinkTwitterFollowersPipelineFollowersIT = {
val reference: Config = ConfigFactory.load()
[5/6] incubator-streams-examples git commit: all five flink examples
passing
Posted by sb...@apache.org.
all five flink examples passing
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/9dcdf645
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/9dcdf645
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/9dcdf645
Branch: refs/heads/master
Commit: 9dcdf645080302d2f8e1bc7dc3d312817d459cf5
Parents: 58fefc0
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 5 16:42:25 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 5 16:42:25 2016 -0500
----------------------------------------------------------------------
flink/flink-twitter-collection/pom.xml | 28 ++++---
.../FlinkTwitterFollowingPipeline.scala | 2 +-
.../collection/FlinkTwitterPostsPipeline.scala | 2 +-
.../FlinkTwitterSpritzerPipeline.scala | 28 +++++--
.../FlinkTwitterUserInformationPipeline.scala | 2 +-
.../markdown/FlinkTwitterSpritzerPipeline.md | 6 +-
.../src/site/markdown/index.md | 6 +-
.../resources/FlinkTwitterFollowingPipeline.dot | 37 +++++++++
.../resources/FlinkTwitterPostsPipeline.dot | 37 +++++++++
.../resources/FlinkTwitterSpritzerPipeline.dot | 33 ++++++++
.../FlinkTwitterUserInformationPipeline.dot | 37 +++++++++
...linkTwitterFollowingPipelineFollowersIT.conf | 6 +-
.../FlinkTwitterFollowingPipelineFriendsIT.conf | 5 +-
.../FlinkTwitterSpritzerPipelineIT.conf | 15 ++++
.../FlinkTwitterUserInformationPipelineIT.conf | 2 +-
...inkTwitterFollowingPipelineFollowersIT.scala | 55 +++++++++++++
...FlinkTwitterFollowingPipelineFriendsIT.scala | 59 ++++++++++++++
.../test/FlinkTwitterFollowingPipelineIT.scala | 86 --------------------
.../test/FlinkTwitterSpritzerPipelineIT.scala | 9 +-
19 files changed, 336 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/pom.xml
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
index 2d35035..4cf0b89 100644
--- a/flink/flink-twitter-collection/pom.xml
+++ b/flink/flink-twitter-collection/pom.xml
@@ -448,16 +448,24 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
- <version>2.12.4</version>
- <executions>
- <execution>
- <id>integration-tests</id>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
+ <configuration>
+ <!-- Run integration test suite rather than individual tests. -->
+ <excludes>
+ <exclude>**/*Test.java</exclude>
+ <exclude>**/*Tests.java</exclude>
+ </excludes>
+ <includes>
+ <include>**/*IT.java</include>
+ <include>**/*ITs.java</include>
+ </includes>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.surefire</groupId>
+ <artifactId>surefire-testng</artifactId>
+ <version>${failsafe.plugin.version}</version>
+ </dependency>
+ </dependencies>
</plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 2fd9336..a20078e 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -126,7 +126,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
// if( test == true ) jsons.print();
- env.execute("FlinkTwitterFollowingPipeline")
+ env.execute(STREAMS_ID)
}
class FollowingCollectorFlatMapFunction(
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index beea973..bb7d54c 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -134,7 +134,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
// if( test == true ) jsons.print();
- env.execute("FlinkTwitterPostsPipeline")
+ env.execute(STREAMS_ID)
}
class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index b615806..d6ed3df 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -1,10 +1,12 @@
package org.apache.streams.examples.flink.twitter.collection
+import java.io.Serializable
import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.functions.StoppableFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
@@ -21,6 +23,7 @@ import org.apache.streams.twitter.TwitterStreamConfiguration
import org.apache.streams.twitter.provider.TwitterStreamProvider
import org.slf4j.{Logger, LoggerFactory}
import org.apache.flink.api.scala._
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat
import scala.collection.JavaConversions._
@@ -82,6 +85,8 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
import FlinkTwitterSpritzerPipeline._
+ val spritzerSource = new SpritzerSource(config.getTwitter)
+
override def run(): Unit = {
val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
@@ -91,7 +96,7 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
val outPath = buildWriterPath(config.getDestination)
- val streamSource : DataStream[String] = env.addSource(new SpritzerSource(config.getTwitter));
+ val streamSource : DataStream[String] = env.addSource(spritzerSource);
if( config.getTest == false )
streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
@@ -101,15 +106,23 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
// if( test == true ) jsons.print();
- env.execute("FlinkTwitterPostsPipeline")
+ env.execute(STREAMS_ID)
+
+ }
+
+ def stop(): Unit = {
+ spritzerSource.stop()
}
- class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable {
+ class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable with StoppableFunction {
+
+ var mapper: ObjectMapper = _
var twitProvider: TwitterStreamProvider = _
@throws[Exception]
override def open(parameters: Configuration): Unit = {
+ mapper = StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT)
twitProvider = new TwitterStreamProvider( sourceConfig )
twitProvider.prepare(twitProvider)
twitProvider.startStream()
@@ -120,17 +133,16 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
do {
Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
iterator = twitProvider.readCurrent().iterator()
- iterator.toList.map(datum => ctx.collect(datum.getDocument.asInstanceOf[String]))
+ iterator.toList.map(datum => ctx.collect(mapper.writeValueAsString(datum.getDocument)))
} while( twitProvider.isRunning )
}
override def cancel(): Unit = {
- twitProvider.cleanUp()
+ close()
}
- @throws[Exception]
- override def close(): Unit = {
- twitProvider.cleanUp()
+ override def stop(): Unit = {
+ close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 867255d..ad0315a 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -128,7 +128,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
- env.execute("FlinkTwitterUserInformationPipeline")
+ env.execute(STREAMS_ID)
}
class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
index 259fe7f..1e59039 100644
--- a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
@@ -24,17 +24,17 @@ Example Configuration:
Run (Local):
------------
- java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
+ java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
Run (Flink):
------------
- flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file>
+ flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline http://<location_of_config_file>
Run (YARN):
-----------
- flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file>
+ flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline http://<location_of_config_file>
[JavaDocs](apidocs/index.html "JavaDocs")
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md
index 0f15603..24783be 100644
--- a/flink/flink-twitter-collection/src/site/markdown/index.md
+++ b/flink/flink-twitter-collection/src/site/markdown/index.md
@@ -16,11 +16,13 @@ Collects large batches of documents from api.twitter.com from a seed set of ids.
Streams:
--------
-<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
+<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
<a href="FlinkTwitterPostsPipeline.html" target="_self">FlinkTwitterPostsPipeline</a>
-<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
+<a href="FlinkTwitterSpritzerPipeline.html" target="_self">FlinkTwitterSpritzerPipeline</a>
+
+<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
Test:
-----
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
new file mode 100644
index 0000000..ba5e60d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //source
+ source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+ //providers
+ TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+ //stream
+ TwitterFollowingProvider -> source [dir=back,style=dashed];
+ TwitterFollowingProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
new file mode 100644
index 0000000..1092ff4
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //source
+ source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+ //providers
+ TwitterTimelineProvider [label="TwitterTimelineProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+ //stream
+ TwitterTimelineProvider -> source [dir=back,style=dashed];
+ TwitterTimelineProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
new file mode 100644
index 0000000..5a57595
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //providers
+ TwitterStreamProvider [label="TwitterStreamProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="hdfs://${host}:${port}/${path}/${writerPath}",shape=box];
+
+ //stream
+ TwitterStreamProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
new file mode 100644
index 0000000..4a37234
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //source
+ source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+ //providers
+ TwitterUserInformationProvider [label="TwitterUserInformationProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java"];
+
+ //persisters
+ RollingFileSink [label="RollingFileSink",shape=ellipse];
+
+ //data
+ destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+ //stream
+ TwitterUserInformationProvider -> source [dir=back,style=dashed];
+ TwitterUserInformationProvider -> RollingFileSink [label="String"];
+ RollingFileSink -> destination;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
index 87057be..3e922ab 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -10,7 +10,11 @@ destination {
path = "target/test-classes"
writerPath = "FlinkTwitterFollowingPipelineFollowersIT"
}
-twitter.endpoint = friends
+twitter {
+ endpoint = followers
+ ids_only = true
+ max_items = 5000
+}
providerWaitMs = 1000
local = true
test = true
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
index b5212ed..038a8dc 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -10,7 +10,10 @@ destination {
path = "target/test-classes"
writerPath = "FlinkTwitterFollowingPipelineFriendsIT"
}
-twitter.endpoint = friends
+twitter {
+ endpoint = friends
+ ids_only = true
+}
providerWaitMs = 1000
local = true
test = true
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
new file mode 100644
index 0000000..fec4769
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
@@ -0,0 +1,15 @@
+destination {
+ fields = ["DOC"]
+ scheme = file
+ path = "target/test-classes"
+ writerPath = "FlinkTwitterSpritzerPipelineIT"
+}
+twitter {
+ endpoint = sample
+ track = [
+ "data"
+ ]
+}
+providerWaitMs = 1000
+local = true
+test = true
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
index 342a850..d3663fe 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -2,7 +2,7 @@ source {
fields = ["ID"]
scheme = file
path = "target/test-classes"
- readerPath = "asf.txt"
+ readerPath = "1000twitterids.txt"
}
destination {
fields = ["DOC"]
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
new file mode 100644
index 0000000..f38ad92
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -0,0 +1,55 @@
+package org.apache.streams.examples.flink.twitter.test
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+
+/**
+ * Created by sblackmon on 3/13/16.
+ */
+class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFollowersIT])
+
+ import FlinkTwitterFollowingPipeline._
+
+ @Test
+ def flinkTwitterFollowersPipelineFollowersIT = {
+
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
+
+ val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+ val jobThread = new Thread(job)
+ jobThread.start
+ jobThread.join
+
+ eventually (timeout(60 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
+ assert(
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+ > 4000)
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
new file mode 100644
index 0000000..464e743
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -0,0 +1,59 @@
+package com.peoplepattern.streams.twitter.collection
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.junit.Ignore
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
+
+/**
+ * Created by sblackmon on 3/13/16.
+ */
+class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFriendsIT])
+
+ import FlinkTwitterFollowingPipeline._
+
+ @Test
+ def flinkTwitterFollowersPipelineFriendsIT = {
+
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
+
+ val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+ val jobThread = new Thread(job)
+ jobThread.start
+ jobThread.join
+
+ eventually (timeout(60 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
+ assert(
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+ > 90)
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
deleted file mode 100644
index e6294f6..0000000
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.peoplepattern.streams.twitter.collection
-
-import java.io.File
-import java.nio.file.{Files, Paths}
-
-import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.slf4j.{Logger, LoggerFactory}
-import org.testng.annotations.Test
-
-import scala.io.Source
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
-import org.scalatest.time.SpanSugar._
-
-/**
- * Created by sblackmon on 3/13/16.
- */
-class FlinkTwitterFollowingPipelineIT extends FlatSpec {
-
- private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT])
-
- import FlinkTwitterFollowingPipeline._
-
- @Test(enabled = false)
- def flinkTwitterFollowersPipelineFriendsIT = {
-
- val reference: Config = ConfigFactory.load()
- val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
- assert(conf_file.exists())
- val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
- val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
- val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
- val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
-
- setup(testConfig)
-
- val job = new FlinkTwitterFollowingPipeline(config = testConfig)
- val jobThread = new Thread(job)
- jobThread.start
- jobThread.join
-
- eventually (timeout(60 seconds), interval(1 seconds)) {
- assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
- assert(
- Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
- > 90)
- }
-
- }
-
- @Test(enabled = false)
- def flinkTwitterFollowersPipelineFollowersIT = {
-
- val reference: Config = ConfigFactory.load()
- val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
- assert(conf_file.exists())
- val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
- val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
- val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
- val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
-
- setup(testConfig)
-
- val job = new FlinkTwitterFollowingPipeline(config = testConfig)
- val jobThread = new Thread(job)
- jobThread.start
- jobThread.join
-
- eventually (timeout(60 seconds), interval(1 seconds)) {
- assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
- assert(
- Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
- > 500)
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index f083f65..2e2e9b1 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -26,7 +26,7 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
import FlinkTwitterSpritzerPipeline._
- @Test(enabled = false)
+ @Test
def flinkTwitterSpritzerPipelineIT = {
val reference: Config = ConfigFactory.load()
@@ -43,13 +43,14 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
val job = new FlinkTwitterSpritzerPipeline(config = testConfig)
val jobThread = new Thread(job)
jobThread.start
- jobThread.join
+ jobThread.join(30000)
+ job.stop()
- eventually (timeout(30 seconds), interval(1 seconds)) {
+ eventually (timeout(60 seconds), interval(1 seconds)) {
assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
- >= 200)
+ >= 10)
}
}
[6/6] incubator-streams-examples git commit: Merge branch 'flink'
Posted by sb...@apache.org.
Merge branch 'flink'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/8fe6860f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/8fe6860f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/8fe6860f
Branch: refs/heads/master
Commit: 8fe6860f7e0587f595409914bba61ff6a74cbcf5
Parents: 6e93a8f 9dcdf64
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Sun Oct 9 16:35:21 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Sun Oct 9 16:35:21 2016 -0500
----------------------------------------------------------------------
flink/flink-twitter-collection/README.md | 8 +
flink/flink-twitter-collection/pom.xml | 473 +++++++++
.../jsonschema/FlinkBatchConfiguration.json | 12 +
.../jsonschema/FlinkStreamingConfiguration.json | 40 +
.../jsonschema/StreamsFlinkConfiguration.json | 48 +
.../TwitterFollowingPipelineConfiguration.json | 29 +
.../TwitterPostsPipelineConfiguration.json | 29 +
.../TwitterSpritzerPipelineConfiguration.json | 29 +
...terUserInformationPipelineConfiguration.json | 29 +
.../resources/FlinkTwitterFollowingPipeline.dot | 37 +
.../FlinkTwitterFollowingPipeline.dot.svg | 63 ++
.../resources/FlinkTwitterPostsPipeline.dot | 37 +
.../resources/FlinkTwitterPostsPipeline.dot.svg | 63 ++
.../resources/FlinkTwitterSpritzerPipeline.dot | 33 +
.../FlinkTwitterSpritzerPipeline.dot.svg | 47 +
.../FlinkTwitterUserInformationPipeline.dot | 37 +
.../FlinkTwitterUserInformationPipeline.dot.svg | 63 ++
.../streams/examples/flink/FlinkBase.scala | 200 ++++
.../FlinkTwitterFollowingPipeline.scala | 156 +++
.../collection/FlinkTwitterPostsPipeline.scala | 161 +++
.../FlinkTwitterSpritzerPipeline.scala | 150 +++
.../FlinkTwitterUserInformationPipeline.scala | 159 +++
.../markdown/FlinkTwitterFollowingPipeline.md | 43 +
.../site/markdown/FlinkTwitterPostsPipeline.md | 41 +
.../markdown/FlinkTwitterSpritzerPipeline.md | 41 +
.../FlinkTwitterUserInformationPipeline.md | 41 +
.../src/site/markdown/index.md | 48 +
.../site/resources/FlinkBatchConfiguration.json | 12 +
.../resources/FlinkStreamingConfiguration.json | 40 +
.../resources/FlinkTwitterFollowingPipeline.dot | 37 +
.../resources/FlinkTwitterPostsPipeline.dot | 37 +
.../resources/FlinkTwitterSpritzerPipeline.dot | 33 +
.../FlinkTwitterUserInformationPipeline.dot | 37 +
.../resources/StreamsFlinkConfiguration.json | 48 +
.../TwitterFollowingBatchConfiguration.json | 23 +
.../TwitterFollowingPipelineConfiguration.json | 29 +
.../TwitterPostsBatchConfiguration.json | 23 +
.../TwitterPostsPipelineConfiguration.json | 29 +
.../TwitterSpritzerPipelineConfiguration.json | 29 +
...witterUserInformationBatchConfiguration.json | 23 +
...terUserInformationPipelineConfiguration.json | 29 +
.../src/test/resources/1000twitterids.txt | 1000 ++++++++++++++++++
...linkTwitterFollowingPipelineFollowersIT.conf | 20 +
.../FlinkTwitterFollowingPipelineFriendsIT.conf | 19 +
.../resources/FlinkTwitterPostsPipelineIT.conf | 15 +
.../FlinkTwitterSpritzerPipelineIT.conf | 15 +
.../FlinkTwitterUserInformationPipelineIT.conf | 15 +
.../src/test/resources/asf.txt | 1 +
...inkTwitterFollowingPipelineFollowersIT.scala | 55 +
...FlinkTwitterFollowingPipelineFriendsIT.scala | 59 ++
.../test/FlinkTwitterPostsPipelineIT.scala | 61 ++
.../test/FlinkTwitterSpritzerPipelineIT.scala | 58 +
.../FlinkTwitterUserInformationPipelineIT.scala | 61 ++
flink/pom.xml | 44 +
local/elasticsearch-hdfs/pom.xml | 14 +-
local/elasticsearch-reindex/pom.xml | 2 +-
local/mongo-elasticsearch-sync/pom.xml | 12 +-
local/twitter-follow-graph/pom.xml | 10 +-
local/twitter-history-elasticsearch/pom.xml | 14 +-
local/twitter-userstream-elasticsearch/pom.xml | 14 +-
pom.xml | 29 +-
61 files changed, 4017 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8fe6860f/pom.xml
----------------------------------------------------------------------
[2/6] incubator-streams-examples git commit: flink example
Posted by sb...@apache.org.
flink example
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/4491cfe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/4491cfe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/4491cfe1
Branch: refs/heads/master
Commit: 4491cfe1d0bf7324073537e89e7e8b6ed8ab43d5
Parents: b3429dd
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Mon Sep 26 12:43:22 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Mon Sep 26 12:43:22 2016 -0500
----------------------------------------------------------------------
flink/flink-twitter-collection/README.md | 8 +
flink/flink-twitter-collection/pom.xml | 420 ++++++++
.../jsonschema/FlinkBatchConfiguration.json | 12 +
.../jsonschema/FlinkStreamingConfiguration.json | 40 +
.../jsonschema/StreamsFlinkConfiguration.json | 48 +
.../TwitterFollowingPipelineConfiguration.json | 29 +
.../TwitterPostsPipelineConfiguration.json | 29 +
...terUserInformationPipelineConfiguration.json | 29 +
.../streams/examples/flink/FlinkBase.scala | 200 ++++
.../FlinkTwitterFollowingPipeline.scala | 149 +++
.../collection/FlinkTwitterPostsPipeline.scala | 165 +++
.../FlinkTwitterUserInformationPipeline.scala | 163 +++
.../markdown/FlinkTwitterFollowingPipeline.md | 41 +
.../site/markdown/FlinkTwitterPostsPipeline.md | 41 +
.../FlinkTwitterUserInformationPipeline.md | 41 +
.../src/site/markdown/index.md | 32 +
.../site/resources/FlinkBatchConfiguration.json | 12 +
.../resources/FlinkStreamingConfiguration.json | 40 +
.../resources/StreamsFlinkConfiguration.json | 48 +
.../TwitterFollowingBatchConfiguration.json | 23 +
.../TwitterFollowingPipelineConfiguration.json | 29 +
.../TwitterPostsBatchConfiguration.json | 23 +
.../TwitterPostsPipelineConfiguration.json | 29 +
...witterUserInformationBatchConfiguration.json | 23 +
...terUserInformationPipelineConfiguration.json | 29 +
.../src/test/resources/1000twitterids.txt | 1000 ++++++++++++++++++
.../FlinkTwitterFollowingPipeline.conf | 10 +
.../resources/FlinkTwitterPostsPipeline.conf | 10 +
.../FlinkTwitterUserInformationPipeline.conf | 10 +
.../src/test/resources/asf.txt | 1 +
.../test/FlinkTwitterFollowingPipelineIT.scala | 81 ++
.../test/FlinkTwitterPostsPipelineIT.scala | 55 +
.../FlinkTwitterUserInformationPipelineIT.scala | 56 +
flink/pom.xml | 47 +
pom.xml | 29 +-
35 files changed, 2988 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/README.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/README.md b/flink/flink-twitter-collection/README.md
new file mode 100644
index 0000000..f9fe687
--- /dev/null
+++ b/flink/flink-twitter-collection/README.md
@@ -0,0 +1,8 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+org.apache.streams:flink-twitter-collection
+===========================================
+
+[README.md](src/site/markdown/index.md "README")
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/pom.xml
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
new file mode 100644
index 0000000..33b05fe
--- /dev/null
+++ b/flink/flink-twitter-collection/pom.xml
@@ -0,0 +1,420 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-examples-flink</artifactId>
+ <version>0.4-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-twitter-collection</artifactId>
+ <name>flink-twitter-collection</name>
+
+ <description>Collects twitter documents using flink.</description>
+
+ <properties>
+ <docker.repo>apachestreams</docker.repo>
+ <hdfs.version>2.7.0</hdfs.version>
+ <flink.version>1.1.2</flink.version>
+ <scala.suffix>2.10</scala.suffix>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-provider-twitter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-hdfs</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hdfs.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.suffix}</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.suffix}</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-filesystem_2.10</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${scala.suffix}</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-core</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>data</directory>
+ <followSymlinks>false</followSymlinks>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ <!-- This binary runs with logback -->
+ <!-- Keep log4j out -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce-banned-dependencies</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <bannedDependencies>
+ <excludes>
+ <exclude>org.slf4j:slf4j-log4j12</exclude>
+ <exclude>org.slf4j:slf4j-jcl</exclude>
+ <exclude>org.slf4j:slf4j-jdk14</exclude>
+ <exclude>org.log4j:log4j</exclude>
+ <exclude>commons-logging:commons-logging</exclude>
+ </excludes>
+ </bannedDependencies>
+ </rules>
+ <fail>true</fail>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <version>0.4.1</version>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
+ <useJodaDates>false</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <includes>**/*.json</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ <includeGroupIds>org.apache.streams</includeGroupIds>
+ <includeTypes>test-jar</includeTypes>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test-resource-dependencies</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.12.4</version>
+ <executions>
+ <execution>
+ <id>integration-tests</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
new file mode 100644
index 0000000..30a2942
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
@@ -0,0 +1,12 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.FlinkBatchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "StreamsFlinkConfiguration.json"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
new file mode 100644
index 0000000..0d63f4e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
@@ -0,0 +1,40 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.FlinkStreamingConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "StreamsFlinkConfiguration.json"
+ },
+ "properties": {
+ "parallel": {
+ "type": "integer",
+ "default": 1
+ },
+ "providerWaitMs": {
+ "type": "integer",
+ "default": 1000
+ },
+ "checkpointIntervalMs": {
+ "type": "integer",
+ "default": 300000
+ },
+ "checkpointTimeoutMs": {
+ "type": "integer",
+ "default": 30000
+ },
+ "restartAttempts": {
+ "type": "integer",
+ "description": "number of restart attempts",
+ "default": 3
+ },
+ "restartDelayMs": {
+ "type": "integer",
+ "description": "delay in milliseconds",
+ "default": 10000
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
new file mode 100644
index 0000000..ef78357
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
@@ -0,0 +1,48 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.StreamsFlinkConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "http://streams.peoplepattern.com/incubator-streams/0.3.9-PP-SNAPSHOT/streams-config/StreamsConfiguration.json"
+ },
+ "properties": {
+ "parallel": {
+ "type": "integer",
+ "default": 1
+ },
+ "providerWaitMs": {
+ "type": "integer",
+ "default": 1000
+ },
+ "checkpointIntervalMs": {
+ "type": "integer",
+ "default": 300000
+ },
+ "checkpointTimeoutMs": {
+ "type": "integer",
+ "default": 30000
+ },
+ "test": {
+ "type": "boolean",
+ "default": false
+ },
+ "local": {
+ "type": "boolean",
+ "default": true
+ },
+ "restartAttempts": {
+ "type": "integer",
+ "description": "number of restart attempts",
+ "default": 3
+ },
+ "restartDelayMs": {
+ "type": "integer",
+ "description": "delay in milliseconds",
+ "default": 10000
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
new file mode 100644
index 0000000..de4f9bb
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
new file mode 100644
index 0000000..628d7ee
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
new file mode 100644
index 0000000..5261748
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
new file mode 100644
index 0000000..1f1ed6d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -0,0 +1,200 @@
+package org.apache.streams.examples.flink
+
+import java.net.MalformedURLException
+
+import com.google.common.base.Strings
+import com.typesafe.config.Config
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.flink.{FlinkBatchConfiguration, FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.slf4j.LoggerFactory
+
+trait FlinkBase {
+
+ private val BASELOGGER = LoggerFactory.getLogger("FlinkBase")
+ private val MAPPER = StreamsJacksonMapper.getInstance()
+
+ var configUrl : String = _
+ var typesafe : Config = _
+ var streamsConfig = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig)
+ var streamsFlinkConfiguration: StreamsFlinkConfiguration = _
+
+ var executionEnvironment: ExecutionEnvironment = _
+ var streamExecutionEnvironment: StreamExecutionEnvironment = _
+
+ /*
+ Basic stuff for every flink job
+ */
+ def main(args: Array[String]): Unit = {
+ // if only one argument, use it as the config URL
+ if( args.size > 0 ) {
+ BASELOGGER.info("Args: {}", args)
+ configUrl = args(0)
+ setup(configUrl)
+ }
+
+ }
+
+ def setup(configUrl : String): Boolean = {
+ BASELOGGER.info("StreamsConfigurator.config: {}", StreamsConfigurator.config)
+ if( !Strings.isNullOrEmpty(configUrl)) {
+ BASELOGGER.info("StreamsConfigurator.resolveConfig(configUrl): {}", StreamsConfigurator.resolveConfig(configUrl))
+ try {
+ typesafe = StreamsConfigurator.resolveConfig(configUrl).withFallback(StreamsConfigurator.config).resolve()
+ } catch {
+ case mue: MalformedURLException => {
+ BASELOGGER.error("Invalid Configuration URL: ", mue)
+ return false
+ }
+ case e: Exception => {
+ BASELOGGER.error("Invalid Configuration URL: ", e)
+ return false
+ }
+ }
+ }
+ else {
+ typesafe = StreamsConfigurator.getConfig
+ }
+
+ return setup(typesafe)
+
+ }
+
+ def setup(typesafe : Config): Boolean = {
+ this.typesafe = typesafe
+
+ BASELOGGER.info("Typesafe Config: {}", typesafe)
+
+ if( this.typesafe.getString("mode").equals("streaming")) {
+ val streamingConfiguration: FlinkStreamingConfiguration =
+ new ComponentConfigurator[FlinkStreamingConfiguration](classOf[FlinkStreamingConfiguration]).detectConfiguration(typesafe)
+ return setupStreaming(streamingConfiguration)
+ } else if( this.typesafe.getString("mode").equals("batch")) {
+ val batchConfiguration: FlinkBatchConfiguration =
+ new ComponentConfigurator[FlinkBatchConfiguration](classOf[FlinkBatchConfiguration]).detectConfiguration(typesafe)
+ return setupBatch(batchConfiguration)
+ } else {
+ return false;
+ }
+ }
+
+// def setup(typesafe: Config): Boolean = {
+//
+// val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
+//
+// this.streamsConfig = streamsConfig
+//
+// BASELOGGER.info("Streams Config: " + streamsConfig)
+//
+// setup(streamsConfig)
+// }
+
+ def setupStreaming(streamingConfiguration: FlinkStreamingConfiguration): Boolean = {
+
+ BASELOGGER.info("FsStreamingFlinkConfiguration: " + streamingConfiguration)
+
+ this.streamsFlinkConfiguration = streamingConfiguration
+
+ if( streamsFlinkConfiguration == null) return false
+
+ if( streamExecutionEnvironment == null )
+ streamExecutionEnvironment = streamEnvironment(streamingConfiguration)
+
+ return false
+
+ }
+
+ def setupBatch(batchConfiguration: FlinkBatchConfiguration): Boolean = {
+
+ BASELOGGER.info("FsBatchFlinkConfiguration: " + batchConfiguration)
+
+ this.streamsFlinkConfiguration = batchConfiguration
+
+ if( streamsFlinkConfiguration == null) return false
+
+ if( executionEnvironment == null )
+ executionEnvironment = batchEnvironment(batchConfiguration)
+
+ return true
+
+ }
+
+ def batchEnvironment(config: FlinkBatchConfiguration = new FlinkBatchConfiguration()) : ExecutionEnvironment = {
+ if (config.getTest == false && config.getLocal == false) {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ return env
+ } else {
+ val env = ExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
+ return env
+ }
+ }
+
+ def streamEnvironment(config: FlinkStreamingConfiguration = new FlinkStreamingConfiguration()) : StreamExecutionEnvironment = {
+ if( config.getTest == false && config.getLocal == false) {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ // start a checkpoint every hour
+ env.enableCheckpointing(config.getCheckpointIntervalMs)
+
+ env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+
+ // checkpoints have to complete within five minutes, or are discarded
+ env.getCheckpointConfig.setCheckpointTimeout(config.getCheckpointTimeoutMs)
+
+ // allow only one checkpoint to be in progress at the same time
+ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+
+ return env
+ }
+
+ else return StreamExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
+ }
+
+ def buildReaderPath(configObject: HdfsReaderConfiguration) : String = {
+ var inPathBuilder : String = ""
+ if (configObject.getScheme.equals(HdfsConfiguration.Scheme.FILE)) {
+ inPathBuilder = configObject.getPath + "/" + configObject.getReaderPath
+ }
+ else if (configObject.getScheme.equals(HdfsConfiguration.Scheme.HDFS)) {
+ inPathBuilder = configObject.getScheme + "://" + configObject.getHost + ":" + configObject.getPort + "/" + configObject.getPath + "/" + configObject.getReaderPath
+ }
+ else if (configObject.getScheme.toString.equals("s3")) {
+ inPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getReaderPath
+ } else {
+ throw new Exception("scheme not recognized: " + configObject.getScheme)
+ }
+ return inPathBuilder
+ }
+
+ def buildWriterPath(configObject: HdfsWriterConfiguration) : String = {
+ var outPathBuilder : String = ""
+ if( configObject.getScheme.equals(HdfsConfiguration.Scheme.FILE)) {
+ outPathBuilder = configObject.getPath + "/" + configObject.getWriterPath
+ }
+ else if( configObject.getScheme.equals(HdfsConfiguration.Scheme.HDFS)) {
+ outPathBuilder = configObject.getScheme + "://" + configObject.getHost + ":" + configObject.getPort + "/" + configObject.getPath + "/" + configObject.getWriterPath
+ }
+ else if( configObject.getScheme.toString.equals("s3")) {
+ outPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getWriterPath
+ } else {
+ throw new Exception("output scheme not recognized: " + configObject.getScheme)
+ }
+ return outPathBuilder
+ }
+
+ def toProviderId(input : String) : String = {
+ if( input.startsWith("@") )
+ return input.substring(1)
+ if( input.contains(':'))
+ return input.substring(input.lastIndexOf(':')+1)
+ else return input
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
new file mode 100644
index 0000000..2ac7d32
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -0,0 +1,149 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterFollowingConfiguration
+import org.apache.streams.twitter.pojo.Follow
+import org.apache.streams.twitter.provider.TwitterFollowingProvider
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Created by sblackmon on 4/20/16.
+ */
+/**
+ * Created by sblackmon on 3/15/16.
+ */
+object FlinkTwitterFollowingPipeline extends FlinkBase {
+
+ val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+ private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+ override def main(args: Array[String]) = {
+ super.main(args)
+ val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+ if( setup(jobConfig) == false ) System.exit(1)
+ val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig)
+ val thread = new Thread(pipeline)
+ thread.start()
+ thread.join()
+ }
+
+ def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean = {
+
+ LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+
+ if( jobConfig == null ) {
+ LOGGER.error("jobConfig is null!")
+ System.err.println("jobConfig is null!")
+ return false
+ }
+
+ if( jobConfig.getSource == null ) {
+ LOGGER.error("jobConfig.getSource is null!")
+ System.err.println("jobConfig.getSource is null!")
+ return false
+ }
+
+ if( jobConfig.getDestination == null ) {
+ LOGGER.error("jobConfig.getDestination is null!")
+ System.err.println("jobConfig.getDestination is null!")
+ return false
+ }
+
+ if( jobConfig.getTwitter == null ) {
+ LOGGER.error("jobConfig.getTwitter is null!")
+ System.err.println("jobConfig.getTwitter is null!")
+ return false
+ }
+
+ return true
+
+ }
+
+}
+
+class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+ import FlinkTwitterFollowingPipeline._
+
+ override def run(): Unit = {
+
+ val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setNumberOfExecutionRetries(0)
+
+ val inPath = buildReaderPath(config.getSource)
+
+ val outPath = buildWriterPath(config.getDestination)
+
+ val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
+
+ // these datums contain 'Follow' objects
+ val followDatums: DataStream[StreamsDatum] =
+ keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
+
+ val follows: DataStream[Follow] = followDatums
+ .map(datum => datum.getDocument.asInstanceOf[Follow])
+
+ val jsons: DataStream[String] = follows
+ .map(follow => {
+ val MAPPER = StreamsJacksonMapper.getInstance
+ MAPPER.writeValueAsString(follow)
+ })
+
+ if( config.getTest == false )
+ jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
+ else
+ jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(env.getParallelism);
+
+ // if( test == true ) jsons.print();
+
+ env.execute("FlinkTwitterFollowingPipeline")
+ }
+
+ class FollowingCollectorFlatMapFunction(
+ twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
+ flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+ ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+
+ override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+ collectConnections(input, out)
+ }
+
+ def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+ val twitProvider: TwitterFollowingProvider =
+ new TwitterFollowingProvider(
+ twitterConfiguration.withIdsOnly(true).withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
+ )
+ twitProvider.prepare(twitProvider)
+ twitProvider.startStream()
+ var iterator: Iterator[StreamsDatum] = null
+ do {
+ Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+ } while( twitProvider.isRunning )
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
new file mode 100644
index 0000000..f8e221c
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -0,0 +1,165 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.util.concurrent.Uninterruptibles
+import com.peoplepattern.streams.pdb.pipelines.FlinkStreamingConfiguration
+import com.peoplepattern.streams.pdb.flink.{FlinkBase, FlinkUtil}
+import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
+import com.peoplepattern.streams.twitter.collection.FlinkTwitterPostsPipeline.LOGGER
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.hdfs.HdfsConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.pojo.{Tweet, User}
+import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Created by sblackmon on 7/29/15.
+ */
+object FlinkTwitterPostsPipeline extends FlinkBase {
+
+ val STREAMS_ID: String = "FlinkTwitterPostsPipeline"
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipeline])
+ private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+ override def main(args: Array[String]) = {
+ super.main(args)
+ val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
+ if( setup(jobConfig) == false ) System.exit(1)
+ val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig)
+ val thread = new Thread(pipeline)
+ thread.start()
+ thread.join()
+ }
+
+ def setup(jobConfig: TwitterPostsPipelineConfiguration): Boolean = {
+
+ LOGGER.info("TwitterPostsPipelineConfiguration: " + jobConfig)
+
+ if( jobConfig == null ) {
+ LOGGER.error("jobConfig is null!")
+ System.err.println("jobConfig is null!")
+ return false
+ }
+
+ if( jobConfig.getSource == null ) {
+ LOGGER.error("jobConfig.getSource is null!")
+ System.err.println("jobConfig.getSource is null!")
+ return false
+ }
+
+ if( jobConfig.getDestination == null ) {
+ LOGGER.error("jobConfig.getDestination is null!")
+ System.err.println("jobConfig.getDestination is null!")
+ return false
+ }
+
+ if( jobConfig.getTwitter == null ) {
+ LOGGER.error("jobConfig.getTwitter is null!")
+ System.err.println("jobConfig.getTwitter is null!")
+ return false
+ }
+
+ return true
+
+ }
+
+}
+
+class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+ import FlinkTwitterPostsPipeline._
+
+ override def run(): Unit = {
+
+ val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setNumberOfExecutionRetries(0)
+
+ val inPath = buildReaderPath(config.getSource)
+
+ val outPath = buildWriterPath(config.getDestination)
+
+ //val inProps = buildKafkaProps(config.getSourceTopic)
+
+ val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+
+ //val idTopicIn = new KafkaSink()
+
+// val idTopicOut : DataStream[String] = env.addSource[String](
+// new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09(config.getSourceTopic.getTopic, new SimpleStringSchema(),
+// inProps));
+
+ val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
+
+ // these datums contain 'Tweet' objects
+ val tweetDatums: DataStream[StreamsDatum] =
+ keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(10).name("tweetDatums")
+
+ val tweets: DataStream[Tweet] = tweetDatums
+ .map(datum => datum.getDocument.asInstanceOf[Tweet]).name("tweets")
+
+ val jsons: DataStream[String] = tweets
+ .map(tweet => {
+ val MAPPER = StreamsJacksonMapper.getInstance
+ MAPPER.writeValueAsString(tweet)
+ }).name("json")
+
+ if( config.getTest == false )
+ jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+ else
+ jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(env.getParallelism);
+
+ // if( test == true ) jsons.print();
+
+ env.execute("FlinkTwitterPostsPipeline")
+ }
+
+ class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+ override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+ collectPosts(input, out)
+ }
+ def collectPosts(id : String, out : Collector[StreamsDatum]) = {
+ val twitterConfiguration = config.getTwitter
+ val twitProvider: TwitterTimelineProvider =
+ new TwitterTimelineProvider(
+ twitterConfiguration.withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(200l)
+ )
+ twitProvider.prepare(twitProvider)
+ twitProvider.startStream()
+ var iterator: Iterator[StreamsDatum] = null
+ do {
+ Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+ } while( twitProvider.isRunning )
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
new file mode 100644
index 0000000..a081c74
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -0,0 +1,163 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.lang
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
+
+import scala.collection.JavaConversions._
+import com.peoplepattern.streams.twitter.collection.FlinkTwitterUserInformationPipeline.LOGGER
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers._
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.hdfs.HdfsConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.pojo.{Tweet, User}
+import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * Created by sblackmon on 3/15/16.
+ */
+object FlinkTwitterUserInformationPipeline extends FlinkBase {
+
+ val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline"
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+ private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+ override def main(args: Array[String]) = {
+ super.main(args)
+ val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
+ if( setup(jobConfig) == false ) System.exit(1)
+ val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig)
+ val thread = new Thread(pipeline)
+ thread.start()
+ thread.join()
+ }
+
+ def setup(jobConfig: TwitterUserInformationPipelineConfiguration): Boolean = {
+
+ LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+
+ if( jobConfig == null ) {
+ LOGGER.error("jobConfig is null!")
+ System.err.println("jobConfig is null!")
+ return false
+ }
+
+ if( jobConfig.getSource == null ) {
+ LOGGER.error("jobConfig.getSource is null!")
+ System.err.println("jobConfig.getSource is null!")
+ return false
+ }
+
+ if( jobConfig.getDestination == null ) {
+ LOGGER.error("jobConfig.getDestination is null!")
+ System.err.println("jobConfig.getDestination is null!")
+ return false
+ }
+
+ if( jobConfig.getTwitter == null ) {
+ LOGGER.error("jobConfig.getTwitter is null!")
+ System.err.println("jobConfig.getTwitter is null!")
+ return false
+ }
+
+ return true
+
+ }
+
+}
+
+class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipelineConfiguration = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+ import FlinkTwitterUserInformationPipeline._
+
+ override def run(): Unit = {
+
+ val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setNumberOfExecutionRetries(0)
+
+ val inPath = buildReaderPath(config.getSource)
+
+ val outPath = buildWriterPath(config.getDestination)
+
+ val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+
+ val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
+
+ val idWindows: WindowedStream[String, Int, GlobalWindow] = keyed_ids.countWindow(100)
+
+ val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists")
+
+ val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(10).name("userDatums")
+
+ val user: DataStream[User] = userDatums.map(datum => datum.getDocument.asInstanceOf[User]).name("users")
+
+ val jsons: DataStream[String] = user
+ .map(user => {
+ val MAPPER = StreamsJacksonMapper.getInstance
+ MAPPER.writeValueAsString(user)
+ }).name("jsons")
+
+ if( config.getTest == false )
+ jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+ else
+ jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(env.getParallelism);
+
+ LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
+
+ env.execute("FlinkTwitterUserInformationPipeline")
+ }
+
+ class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
+ override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 )
+ out.collect(input.map(id => FlinkUtil.toProviderId(id)).toList)
+ }
+ }
+
+ class profileCollectorFlatMapFunction extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
+ override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = {
+ collectProfiles(input, out)
+ }
+ def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = {
+ val twitterConfiguration = config.getTwitter
+ val twitProvider: TwitterUserInformationProvider =
+ new TwitterUserInformationProvider(
+ twitterConfiguration.withInfo(ids)
+ )
+ twitProvider.prepare(twitProvider)
+ twitProvider.startStream()
+ var iterator: Iterator[StreamsDatum] = null
+ do {
+ Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+ } while( twitProvider.isRunning )
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
new file mode 100644
index 0000000..22f30f5
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterFollowingPipeline
+=============================
+
+Description:
+-----------------
+
+Collects twitter friends or followers with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterFollowingPipeline.dot](FlinkTwitterFollowingPipeline.dot "FlinkTwitterFollowingPipeline.dot" )
+
+Diagram:
+-----------------
+
+![FlinkTwitterFollowingPipeline.dot.svg](./FlinkTwitterFollowingPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterFollowingPipeline.json](FlinkTwitterFollowingPipeline.json "FlinkTwitterFollowingPipeline.json" )
+
+Run (Local):
+------------
+
+ java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+
+Run (Flink):
+------------
+
+ flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline http://<location_of_config_file>
+
+Run (YARN):
+-----------
+
+ flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline http://<location_of_config_file>
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
new file mode 100644
index 0000000..5f77994
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterPostsPipeline
+=========================
+
+Description:
+-----------------
+
+Collects twitter posts with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterPostsPipeline.dot](FlinkTwitterPostsPipeline.dot "FlinkTwitterPostsPipeline.dot" )
+
+Diagram:
+-----------------
+
+![FlinkTwitterPostsPipeline.dot.svg](./FlinkTwitterPostsPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterPostsPipeline.json](FlinkTwitterPostsPipeline.json "FlinkTwitterPostsPipeline.json" )
+
+Run (Local):
+------------
+
+ java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
+
+Run (Flink):
+------------
+
+ flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file>
+
+Run (YARN):
+-----------
+
+ flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file>
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
new file mode 100644
index 0000000..5e0d1fe
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterUserInformationPipeline
+===================================
+
+Description:
+-----------------
+
+Collects twitter users with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterUserInformationPipeline.dot](FlinkTwitterUserInformationPipeline.dot "FlinkTwitterUserInformationPipeline.dot" )
+
+Diagram:
+-----------------
+
+![TwitterUserInformationPipeline.dot.svg](./TwitterUserInformationPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterUserInformationPipeline.json](FlinkTwitterUserInformationPipeline.json "FlinkTwitterUserInformationPipeline.json" )
+
+Run (Local):
+------------
+
+ java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
+
+Run (Flink):
+------------
+
+ flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline http://<location_of_config_file>
+
+Run (YARN):
+-----------
+
+ flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline http://<location_of_config_file>
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md
new file mode 100644
index 0000000..19e44cf
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/index.md
@@ -0,0 +1,32 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+flink-twitter-collection
+========================
+
+Requirements:
+-------------
+ - Authorized Twitter API credentials
+
+Description:
+------------
+Collects large batches of documents from api.twitter.com from a seed set of ids.
+
+Streams:
+--------
+
+<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
+
+<a href="FlinkTwitterPostsPipeline.html" target="_self">FlinkTwitterPostsPipeline</a>
+
+<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
+
+Build:
+---------
+
+ mvn clean install verify
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
new file mode 100644
index 0000000..30a2942
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
@@ -0,0 +1,12 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.FlinkBatchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "StreamsFlinkConfiguration.json"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json b/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
new file mode 100644
index 0000000..0d63f4e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
@@ -0,0 +1,40 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.FlinkStreamingConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "StreamsFlinkConfiguration.json"
+ },
+ "properties": {
+ "parallel": {
+ "type": "integer",
+ "default": 1
+ },
+ "providerWaitMs": {
+ "type": "integer",
+ "default": 1000
+ },
+ "checkpointIntervalMs": {
+ "type": "integer",
+ "default": 300000
+ },
+ "checkpointTimeoutMs": {
+ "type": "integer",
+ "default": 30000
+ },
+ "restartAttempts": {
+ "type": "integer",
+ "description": "number of restart attempts",
+ "default": 3
+ },
+ "restartDelayMs": {
+ "type": "integer",
+ "description": "delay in milliseconds",
+ "default": 10000
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json b/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
new file mode 100644
index 0000000..ef78357
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
@@ -0,0 +1,48 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.StreamsFlinkConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "http://streams.peoplepattern.com/incubator-streams/0.3.9-PP-SNAPSHOT/streams-config/StreamsConfiguration.json"
+ },
+ "properties": {
+ "parallel": {
+ "type": "integer",
+ "default": 1
+ },
+ "providerWaitMs": {
+ "type": "integer",
+ "default": 1000
+ },
+ "checkpointIntervalMs": {
+ "type": "integer",
+ "default": 300000
+ },
+ "checkpointTimeoutMs": {
+ "type": "integer",
+ "default": 30000
+ },
+ "test": {
+ "type": "boolean",
+ "default": false
+ },
+ "local": {
+ "type": "boolean",
+ "default": true
+ },
+ "restartAttempts": {
+ "type": "integer",
+ "description": "number of restart attempts",
+ "default": 3
+ },
+ "restartDelayMs": {
+ "type": "integer",
+ "description": "delay in milliseconds",
+ "default": 10000
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
new file mode 100644
index 0000000..33afb29
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterFollowingBatchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+ },
+ "hdfs": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "s3": {
+ "type": "object",
+ "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
new file mode 100644
index 0000000..de4f9bb
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
new file mode 100644
index 0000000..376bb4d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterPostsBatchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "hdfs": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "s3": {
+ "type": "object",
+ "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
new file mode 100644
index 0000000..628d7ee
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
new file mode 100644
index 0000000..55f9fbd
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterUserInformationBatchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "hdfs": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "s3": {
+ "type": "object",
+ "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
new file mode 100644
index 0000000..5261748
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file