You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/09 21:35:42 UTC

[1/6] incubator-streams-examples git commit: flink example

Repository: incubator-streams-examples
Updated Branches:
  refs/heads/master 6e93a8f7a -> 8fe6860f7


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/resources/1000twitterids.txt
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/1000twitterids.txt b/flink/flink-twitter-collection/src/test/resources/1000twitterids.txt
new file mode 100644
index 0000000..0590b9d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/1000twitterids.txt
@@ -0,0 +1,1000 @@
+twitter:3424266646
+twitter:3277467241
+twitter:3244517214
+twitter:29953647
+twitter:63818319
+twitter:1528436754
+twitter:405580894
+twitter:322778026
+twitter:172382176
+twitter:633076833
+twitter:703735608
+twitter:2347223440
+twitter:2907929487
+twitter:950240089
+twitter:1418546592
+twitter:3318418717
+twitter:2848958704
+twitter:1120797264
+twitter:933623324
+twitter:2977700375
+twitter:328204518
+twitter:585131136
+twitter:2868789793
+twitter:158347647
+twitter:2915413161
+twitter:2217367263
+twitter:2534019247
+twitter:3033565239
+twitter:377379801
+twitter:2525341814
+twitter:3123827524
+twitter:1840932523
+twitter:3307643975
+twitter:3301777832
+twitter:961987748
+twitter:3205632255
+twitter:2799469322
+twitter:17730681
+twitter:1495242662
+twitter:1909516123
+twitter:263933760
+twitter:312651511
+twitter:2479527469
+twitter:2357151036
+twitter:346433828
+twitter:44801893
+twitter:1049697306
+twitter:2779673194
+twitter:18323141
+twitter:2172488902
+twitter:2373431930
+twitter:1038322550
+twitter:2946211549
+twitter:2911057543
+twitter:1186036284
+twitter:2878076317
+twitter:1312950464
+twitter:57323685
+twitter:32929857
+twitter:301933631
+twitter:2852217152
+twitter:330422649
+twitter:98470876
+twitter:933125156
+twitter:3237125761
+twitter:914882005
+twitter:1560239652
+twitter:900444860
+twitter:402918702
+twitter:1820690166
+twitter:3074359086
+twitter:353183684
+twitter:528544881
+twitter:1881638161
+twitter:2751762993
+twitter:3161315692
+twitter:3305680079
+twitter:1721613488
+twitter:513068659
+twitter:627186234
+twitter:3203648416
+twitter:1541163325
+twitter:1882043502
+twitter:29071727
+twitter:610104090
+twitter:2819781014
+twitter:2909115204
+twitter:213886397
+twitter:3249385591
+twitter:3086875073
+twitter:87040031
+twitter:2202487475
+twitter:334896132
+twitter:49163181
+twitter:3433984816
+twitter:543969362
+twitter:489445461
+twitter:855051894
+twitter:2792040175
+twitter:117051455
+twitter:438599410
+twitter:1387329846
+twitter:711595782
+twitter:3230662766
+twitter:2766672269
+twitter:2926781875
+twitter:863203928
+twitter:517199566
+twitter:201645935
+twitter:1555939147
+twitter:2943152669
+twitter:1324775431
+twitter:400234897
+twitter:2347416842
+twitter:1558112510
+twitter:474415350
+twitter:2153710970
+twitter:1408335014
+twitter:3633713483
+twitter:3166021013
+twitter:3530993294
+twitter:332598229
+twitter:308252069
+twitter:3317826986
+twitter:572175644
+twitter:1718271572
+twitter:2869090090
+twitter:23725109
+twitter:1926137280
+twitter:1486830500
+twitter:743080386
+twitter:3250479720
+twitter:2560441544
+twitter:2715649872
+twitter:287089153
+twitter:18761334
+twitter:2305577745
+twitter:724860668
+twitter:193306049
+twitter:2615761979
+twitter:2463299598
+twitter:1436916012
+twitter:919019185
+twitter:90502449
+twitter:50689522
+twitter:1383774679
+twitter:612784850
+twitter:410319975
+twitter:833440153
+twitter:442322844
+twitter:2181167094
+twitter:94012832
+twitter:112748352
+twitter:1474618075
+twitter:158262669
+twitter:2391506308
+twitter:882502026
+twitter:2693660146
+twitter:2971933908
+twitter:55271184
+twitter:2287356556
+twitter:2895756090
+twitter:407147132
+twitter:3262181
+twitter:313317193
+twitter:2729137002
+twitter:2939122360
+twitter:2751601568
+twitter:1215082350
+twitter:124866576
+twitter:274292311
+twitter:3310301042
+twitter:95407473
+twitter:24993769
+twitter:1342908648
+twitter:1805339413
+twitter:3118252036
+twitter:893269387
+twitter:1481149014
+twitter:463288019
+twitter:75008083
+twitter:2895489727
+twitter:965493739
+twitter:278637248
+twitter:1937513246
+twitter:422218268
+twitter:3320995462
+twitter:78682286
+twitter:2777069098
+twitter:2909553730
+twitter:2914338670
+twitter:1251667531
+twitter:2764034755
+twitter:532659717
+twitter:269002510
+twitter:29373713
+twitter:358075450
+twitter:633880614
+twitter:200374379
+twitter:141628294
+twitter:1513028977
+twitter:116798791
+twitter:2937455354
+twitter:246194623
+twitter:793925970
+twitter:115594167
+twitter:82463176
+twitter:324774974
+twitter:185844856
+twitter:2462295999
+twitter:3555105016
+twitter:1029169117
+twitter:2689309484
+twitter:1587145976
+twitter:1607241271
+twitter:3032276402
+twitter:183916933
+twitter:63766245
+twitter:151217255
+twitter:2781098109
+twitter:252081559
+twitter:1608788256
+twitter:41984573
+twitter:1896587353
+twitter:40136999
+twitter:295505814
+twitter:384867933
+twitter:116947371
+twitter:255703939
+twitter:2687800732
+twitter:76543916
+twitter:881649782
+twitter:2765729924
+twitter:1715695669
+twitter:1965383022
+twitter:2888214228
+twitter:21820514
+twitter:1727966414
+twitter:2581992818
+twitter:103999565
+twitter:741018846
+twitter:446792386
+twitter:2568989424
+twitter:2780674777
+twitter:465934916
+twitter:3378294885
+twitter:2885604327
+twitter:3336273419
+twitter:130742941
+twitter:2327629099
+twitter:1103818104
+twitter:3050036073
+twitter:2882456842
+twitter:2702914248
+twitter:2153674818
+twitter:132825659
+twitter:289758699
+twitter:2995946100
+twitter:3027449217
+twitter:2708029160
+twitter:1529367002
+twitter:608170333
+twitter:140446819
+twitter:2790688993
+twitter:1597308192
+twitter:14462028
+twitter:104062608
+twitter:370274893
+twitter:356145607
+twitter:566542629
+twitter:112587243
+twitter:39372070
+twitter:146853060
+twitter:2440984657
+twitter:3074554539
+twitter:204701034
+twitter:887623447
+twitter:1971521630
+twitter:2457208175
+twitter:466113358
+twitter:1574643830
+twitter:1465533884
+twitter:2500404589
+twitter:1633154150
+twitter:1349117870
+twitter:1658071267
+twitter:593022891
+twitter:3094177813
+twitter:1304672510
+twitter:3385525697
+twitter:2916225552
+twitter:2759773715
+twitter:1369215552
+twitter:1058390078
+twitter:2532850321
+twitter:351483656
+twitter:1902796704
+twitter:113000738
+twitter:2241245557
+twitter:2416606754
+twitter:408729540
+twitter:2530294556
+twitter:2936808249
+twitter:3138999692
+twitter:2679987883
+twitter:1448537377
+twitter:2524773906
+twitter:942079406
+twitter:2217584389
+twitter:3059427504
+twitter:3028507725
+twitter:632766658
+twitter:3302663431
+twitter:2914832897
+twitter:93487101
+twitter:2786054379
+twitter:1339647769
+twitter:531402307
+twitter:402066474
+twitter:337936675
+twitter:2760568625
+twitter:1385916396
+twitter:2595560922
+twitter:421910477
+twitter:1713100813
+twitter:352016040
+twitter:415247994
+twitter:1883606209
+twitter:2974994111
+twitter:1118022211
+twitter:3096979637
+twitter:711889867
+twitter:262890561
+twitter:233810062
+twitter:1877177168
+twitter:964106670
+twitter:164985413
+twitter:2920420361
+twitter:318936782
+twitter:3289826764
+twitter:145873735
+twitter:2523059919
+twitter:2409896179
+twitter:2292047201
+twitter:285674825
+twitter:2765549780
+twitter:2359541905
+twitter:2419103894
+twitter:358884588
+twitter:206231205
+twitter:136500778
+twitter:1397885138
+twitter:2625422097
+twitter:2524578002
+twitter:604278657
+twitter:2625634867
+twitter:73168019
+twitter:407448958
+twitter:189276174
+twitter:2507896925
+twitter:80880449
+twitter:520177827
+twitter:418469102
+twitter:2925075456
+twitter:615730636
+twitter:2995998941
+twitter:2697270934
+twitter:497135011
+twitter:2944598402
+twitter:428706893
+twitter:1345291712
+twitter:388751708
+twitter:130092079
+twitter:2984741882
+twitter:1047514436
+twitter:15927135
+twitter:2884357840
+twitter:294362779
+twitter:2870985800
+twitter:1720400449
+twitter:130027314
+twitter:2970518577
+twitter:240923858
+twitter:1613498838
+twitter:708321211
+twitter:1403382426
+twitter:2602186970
+twitter:1596855998
+twitter:280062526
+twitter:2716454552
+twitter:268720451
+twitter:2869044811
+twitter:1911762488
+twitter:392373280
+twitter:2151082712
+twitter:2770919004
+twitter:231541900
+twitter:60122778
+twitter:390006102
+twitter:240167506
+twitter:1558314660
+twitter:221608257
+twitter:852829933
+twitter:461669243
+twitter:239778483
+twitter:502146157
+twitter:1471963970
+twitter:276426707
+twitter:2336546150
+twitter:323595235
+twitter:128670043
+twitter:1308641714
+twitter:1411112756
+twitter:3011727217
+twitter:3082006921
+twitter:450537474
+twitter:2673101407
+twitter:2416030447
+twitter:51952627
+twitter:708057486
+twitter:833620748
+twitter:3024957797
+twitter:2147572362
+twitter:1712467098
+twitter:2899300501
+twitter:1348351772
+twitter:2923114629
+twitter:2779232814
+twitter:21306308
+twitter:1466314507
+twitter:1224588289
+twitter:81307783
+twitter:42717316
+twitter:315972617
+twitter:434649827
+twitter:105839296
+twitter:366063496
+twitter:34045892
+twitter:3076447389
+twitter:92437198
+twitter:3124335006
+twitter:1444393410
+twitter:351737762
+twitter:1919360383
+twitter:2836048345
+twitter:1670939112
+twitter:722140159
+twitter:92939425
+twitter:2932728756
+twitter:2831872033
+twitter:1354255123
+twitter:1689738186
+twitter:463578260
+twitter:2881582438
+twitter:912252510
+twitter:3226221887
+twitter:390827200
+twitter:269169237
+twitter:1450007192
+twitter:2735984326
+twitter:3029836305
+twitter:28291382
+twitter:785668627
+twitter:567287970
+twitter:1480004420
+twitter:131927864
+twitter:2958631308
+twitter:488490020
+twitter:2603422688
+twitter:3186614985
+twitter:177373618
+twitter:2466506329
+twitter:2651294251
+twitter:3367170684
+twitter:2673870882
+twitter:369098635
+twitter:242011326
+twitter:18099277
+twitter:1922210574
+twitter:3093762445
+twitter:470634878
+twitter:1674607392
+twitter:2920526283
+twitter:3261677580
+twitter:2192187078
+twitter:485599960
+twitter:1854850729
+twitter:95198467
+twitter:2228217740
+twitter:2171528344
+twitter:2957461230
+twitter:226615737
+twitter:1624183567
+twitter:158597677
+twitter:2909224690
+twitter:19278114
+twitter:2488284258
+twitter:2777071149
+twitter:1598064697
+twitter:2740691127
+twitter:3100908480
+twitter:1147010126
+twitter:2741161553
+twitter:439971668
+twitter:3247227273
+twitter:2884261062
+twitter:3127250575
+twitter:2942021278
+twitter:539428196
+twitter:409599986
+twitter:3161801331
+twitter:2328613860
+twitter:1903013437
+twitter:313082004
+twitter:2580495721
+twitter:209464435
+twitter:600172085
+twitter:339541217
+twitter:62219810
+twitter:583287316
+twitter:295891933
+twitter:561683767
+twitter:229192352
+twitter:1357869918
+twitter:235438136
+twitter:1599249169
+twitter:583879210
+twitter:507744802
+twitter:1696336261
+twitter:2323537206
+twitter:36882220
+twitter:541528426
+twitter:956202559
+twitter:387936537
+twitter:211658842
+twitter:2685186010
+twitter:2581656488
+twitter:391154378
+twitter:122932105
+twitter:409764153
+twitter:129737967
+twitter:2848806360
+twitter:3054860719
+twitter:372199585
+twitter:2316121597
+twitter:703345746
+twitter:3335505287
+twitter:2466151422
+twitter:380038166
+twitter:420561214
+twitter:2977085351
+twitter:110955327
+twitter:3004295886
+twitter:2362857361
+twitter:3053844460
+twitter:3182081552
+twitter:324208260
+twitter:2571790321
+twitter:1061498868
+twitter:2187395299
+twitter:2187482779
+twitter:3096652530
+twitter:2538239672
+twitter:3809634552
+twitter:2306848839
+twitter:1544061547
+twitter:151075965
+twitter:3250238556
+twitter:16157689
+twitter:1692663644
+twitter:1356000732
+twitter:436774994
+twitter:45503055
+twitter:1086037316
+twitter:2798297775
+twitter:2923485772
+twitter:58731726
+twitter:211816170
+twitter:885013716
+twitter:2608529078
+twitter:2954917057
+twitter:2271021600
+twitter:173743066
+twitter:451543575
+twitter:3219728436
+twitter:399824828
+twitter:2464688153
+twitter:2541069631
+twitter:1522892262
+twitter:3167829845
+twitter:944851321
+twitter:2471474509
+twitter:68073858
+twitter:1496221376
+twitter:13979882
+twitter:2218792189
+twitter:302123873
+twitter:2845915546
+twitter:431402814
+twitter:1364254945
+twitter:2711277666
+twitter:2766696876
+twitter:2495441323
+twitter:2844317433
+twitter:138009079
+twitter:2578631100
+twitter:478167529
+twitter:1222728360
+twitter:1323688411
+twitter:2883066187
+twitter:2443554697
+twitter:411631689
+twitter:68537682
+twitter:1027019269
+twitter:1660752493
+twitter:987324488
+twitter:2764106926
+twitter:2184511674
+twitter:103419315
+twitter:2310456424
+twitter:1572938088
+twitter:2554895281
+twitter:34138105
+twitter:2942100621
+twitter:160517898
+twitter:285075974
+twitter:2260805169
+twitter:19390498
+twitter:301696842
+twitter:2588239985
+twitter:2886588596
+twitter:2962622367
+twitter:1867897483
+twitter:2827053488
+twitter:1447767319
+twitter:2924491293
+twitter:167327096
+twitter:3309592402
+twitter:2795575638
+twitter:578758971
+twitter:2888665561
+twitter:30542348
+twitter:1437049609
+twitter:2242541566
+twitter:74354017
+twitter:58900854
+twitter:2159055031
+twitter:246517688
+twitter:2916873012
+twitter:1110055280
+twitter:562430843
+twitter:761797794
+twitter:1648208552
+twitter:301483343
+twitter:2896842048
+twitter:522103295
+twitter:1578517986
+twitter:2659610776
+twitter:2890560429
+twitter:1427665578
+twitter:268363160
+twitter:563709041
+twitter:2172300002
+twitter:2791262431
+twitter:3039809351
+twitter:2914940301
+twitter:2746560353
+twitter:2892191616
+twitter:71596845
+twitter:233770184
+twitter:1530949130
+twitter:105906110
+twitter:755347622
+twitter:490836906
+twitter:357603454
+twitter:324517203
+twitter:2835402315
+twitter:3285479894
+twitter:86368327
+twitter:238219970
+twitter:3153173945
+twitter:2732361234
+twitter:2357626327
+twitter:346602505
+twitter:13732632
+twitter:44055265
+twitter:2998032219
+twitter:482072312
+twitter:1721073866
+twitter:1386781034
+twitter:168194206
+twitter:1213443144
+twitter:181296114
+twitter:942598400
+twitter:2955577216
+twitter:582056669
+twitter:747540468
+twitter:2371722140
+twitter:360824004
+twitter:3023711736
+twitter:207032580
+twitter:2748107976
+twitter:464428175
+twitter:3150849096
+twitter:85450014
+twitter:2840066340
+twitter:2287819200
+twitter:240931426
+twitter:553606800
+twitter:397876544
+twitter:2195298230
+twitter:2601812005
+twitter:3013344739
+twitter:17599363
+twitter:1572639314
+twitter:3377673407
+twitter:303420278
+twitter:2811879995
+twitter:526860891
+twitter:346333874
+twitter:113568311
+twitter:705488304
+twitter:3238867619
+twitter:333772149
+twitter:373309716
+twitter:300472003
+twitter:3223424681
+twitter:2895699896
+twitter:3241119570
+twitter:1147453440
+twitter:3135402609
+twitter:521763744
+twitter:2702966971
+twitter:2878317616
+twitter:845031697
+twitter:2855454471
+twitter:3051902539
+twitter:482306439
+twitter:129173738
+twitter:306572138
+twitter:2941951538
+twitter:762707233
+twitter:2732608168
+twitter:1228456939
+twitter:246020724
+twitter:1920607602
+twitter:14434245
+twitter:1254943537
+twitter:1520746602
+twitter:150745124
+twitter:1350160351
+twitter:38707222
+twitter:267766858
+twitter:2992121760
+twitter:712666764
+twitter:983036864
+twitter:289490939
+twitter:269797384
+twitter:100215048
+twitter:3099557245
+twitter:2339741570
+twitter:306005146
+twitter:1182227460
+twitter:288235870
+twitter:1412832260
+twitter:455190443
+twitter:489912183
+twitter:448994061
+twitter:2944595072
+twitter:2453094914
+twitter:2899434206
+twitter:59288818
+twitter:2824706688
+twitter:423363992
+twitter:972850482
+twitter:997868714
+twitter:1203750733
+twitter:176147179
+twitter:115110596
+twitter:2978397615
+twitter:2528946267
+twitter:620180433
+twitter:365949935
+twitter:110609853
+twitter:1533494268
+twitter:2723839166
+twitter:34186887
+twitter:2864430424
+twitter:76942977
+twitter:361086733
+twitter:2724200587
+twitter:635206139
+twitter:2757801421
+twitter:19651443
+twitter:3364322949
+twitter:2770576744
+twitter:2168612560
+twitter:764020297
+twitter:2558268513
+twitter:2855384901
+twitter:1881414907
+twitter:2502212139
+twitter:3250037586
+twitter:2525185944
+twitter:591375982
+twitter:707911211
+twitter:3025041666
+twitter:19785599
+twitter:2311172950
+twitter:922817815
+twitter:739363530
+twitter:2812894393
+twitter:2496283986
+twitter:206162815
+twitter:590916342
+twitter:354053245
+twitter:2735195854
+twitter:2788759128
+twitter:3510947235
+twitter:3490740532
+twitter:2920847304
+twitter:2681444558
+twitter:2856805755
+twitter:3103899682
+twitter:145893832
+twitter:3065663910
+twitter:2736009516
+twitter:2835226230
+twitter:1590913771
+twitter:2700889555
+twitter:2221272164
+twitter:109780161
+twitter:700221218
+twitter:541753453
+twitter:126575915
+twitter:274336817
+twitter:2498172455
+twitter:2809515630
+twitter:2588774684
+twitter:296734891
+twitter:2212410182
+twitter:243027454
+twitter:1336526904
+twitter:397062736
+twitter:449331876
+twitter:30619307
+twitter:2310483811
+twitter:2437586509
+twitter:191710730
+twitter:1084185378
+twitter:2831486681
+twitter:1606477879
+twitter:969600636
+twitter:529783214
+twitter:2928131586
+twitter:190041293
+twitter:2967031274
+twitter:2165962781
+twitter:376501355
+twitter:284137985
+twitter:266863824
+twitter:407944074
+twitter:108456036
+twitter:1641294422
+twitter:900733706
+twitter:1063071450
+twitter:1682722328
+twitter:341419520
+twitter:1644293778
+twitter:2245151467
+twitter:511176989
+twitter:241922669
+twitter:3388315624
+twitter:1909431145
+twitter:2223820028
+twitter:600581315
+twitter:1723555076
+twitter:2748445313
+twitter:561211823
+twitter:561022931
+twitter:2751429993
+twitter:2714908343
+twitter:16165257
+twitter:524623359
+twitter:306741266
+twitter:469994381
+twitter:2561892084
+twitter:998802661
+twitter:1492924374
+twitter:789039140
+twitter:210150093
+twitter:817544820
+twitter:35740178
+twitter:326162841
+twitter:1447331628
+twitter:17493441
+twitter:2874693608
+twitter:965027312
+twitter:261936985
+twitter:510564259
+twitter:728031187
+twitter:164696234
+twitter:2204519310
+twitter:1626241164
+twitter:1024940588
+twitter:221486613
+twitter:571084565
+twitter:3029264508
+twitter:221716563
+twitter:2211417135
+twitter:499972359
+twitter:1565989165
+twitter:2436927208
+twitter:381029291
+twitter:2730580620
+twitter:3436438413
+twitter:2466014604
+twitter:538990742
+twitter:2935470687
+twitter:1162845468
+twitter:468108082
+twitter:2383897542
+twitter:2542119658
+twitter:1962281514
+twitter:171235080
+twitter:536915535100125185
+twitter:2841076618
+twitter:3006098500
+twitter:1057158554
+twitter:3245676721
+twitter:251087536
+twitter:3082811549
+twitter:281785349
+twitter:1674871100
+twitter:1898659951
+twitter:1414854156
+twitter:428693618
+twitter:2385953101
+twitter:2281213477
+twitter:2786368894
+twitter:2253203998
+twitter:357277727
+twitter:1358707970
+twitter:545186198
+twitter:3033613587
+twitter:107121821
+twitter:595965259
+twitter:583894637
+twitter:1306698787
+twitter:442262869
+twitter:2868353318
+twitter:1908436844
+twitter:271982042
+twitter:495202171
+twitter:251586884
+twitter:3151032974
+twitter:2213682568
+twitter:1203133039
+twitter:193128957
+twitter:597407120
+twitter:2781102086
+twitter:369254505
+twitter:62831036
+twitter:2328734640
+twitter:2579064082
+twitter:3271313827
+twitter:2880366619
+twitter:2323026113
+twitter:446380518
+twitter:245418139
+twitter:261211664
+twitter:1893329208
+twitter:3406596309
+twitter:584967077
+twitter:1708862304
+twitter:388961426
+twitter:2421535351
+twitter:2194375668
+twitter:2790313673
+twitter:2728894977
+twitter:2829174824
+twitter:784541196
+twitter:959902393
+twitter:249705367
+twitter:1677679309
+twitter:2825975175
+twitter:1305768366
+twitter:373475046
+twitter:785362464
+twitter:419607671
+twitter:61031675
+twitter:3854236343
+twitter:714603248
+twitter:1301447720
+twitter:827660912
+twitter:2383764684
+twitter:3180084906
+twitter:3265558124
+twitter:608536922
+twitter:238943561
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
new file mode 100644
index 0000000..e74f00c
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
@@ -0,0 +1,10 @@
+twitter {
+  endpoint = followers
+  version = 1.1
+  oauth {
+    consumerKey = ""
+    consumerSecret = ""
+    accessToken = ""
+    accessTokenSecret = ""
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
new file mode 100644
index 0000000..63a6481
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
@@ -0,0 +1,10 @@
+twitter {
+  version = 1.1
+  endpoint = statuses
+  oauth {
+    consumerKey = ""
+    consumerSecret = ""
+    accessToken = ""
+    accessTokenSecret = ""
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
new file mode 100644
index 0000000..6e0a879
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
@@ -0,0 +1,10 @@
+twitter {
+  version = 1.1
+  endpoint = users
+  oauth {
+    consumerKey = ""
+    consumerSecret = ""
+    accessToken = ""
+    accessTokenSecret = ""
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/resources/asf.txt
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/asf.txt b/flink/flink-twitter-collection/src/test/resources/asf.txt
new file mode 100644
index 0000000..c2b1ea1
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/asf.txt
@@ -0,0 +1 @@
+twitter:18055613
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
new file mode 100644
index 0000000..aa2b1a9
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
@@ -0,0 +1,81 @@
+package com.peoplepattern.streams.twitter.collection
+
+import java.nio.file.{Files, Paths}
+
+import com.peoplepattern.streams.pipelines.pdb.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+
+/**
+  * Created by sblackmon on 3/13/16.
+  */
+class FlinkTwitterFollowingPipelineIT extends FlatSpec {
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT])
+
+  @Test
+  def flinkTwitterFollowersPipelineFriendsIT = {
+
+    val testConfig : TwitterFollowingPipelineConfiguration =
+      new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+    testConfig.getTwitter.setEndpoint("friends")
+    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
+    source.setPath("target/test-classes")
+    testConfig.setSource(source);
+    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
+    destination.setPath("target/test-classes")
+    testConfig.setDestination(destination)
+    testConfig.setProviderWaitMs(1000l)
+    testConfig.setTest(true)
+
+    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+    val jobThread = new Thread(job)
+    jobThread.start
+    jobThread.join
+
+    eventually (timeout(30 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends")))
+      assert(
+        Source.fromFile("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends", "UTF-8").getLines.size
+          > 90)
+    }
+
+  }
+
+  @Test
+  def flinkTwitterFollowersPipelineFollowersIT = {
+
+    val testConfig : TwitterFollowingPipelineConfiguration =
+      new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+    testConfig.getTwitter.setEndpoint("followers")
+    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
+    source.setPath("target/test-classes")
+    testConfig.setSource(source);
+    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/followers").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
+    destination.setPath("target/test-classes")
+    testConfig.setDestination(destination)
+    testConfig.setProviderWaitMs(1000l)
+    testConfig.setTest(true)
+
+    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+    val jobThread = new Thread(job)
+    jobThread.start
+    jobThread.join
+
+    eventually (timeout(30 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterFollowingPipeline/followers")))
+      assert(
+        Source.fromFile("target/test-classes/FlinkTwitterFollowingPipeline/followers", "UTF-8").getLines.size
+          > 500)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
new file mode 100644
index 0000000..8a942e5
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -0,0 +1,55 @@
+package com.peoplepattern.streams.twitter.collection
+
+import java.nio.file.{Files, Paths}
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.{Monitor, Uninterruptibles}
+import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
+import org.testng.annotations.Test
+
+/**
+  * Created by sblackmon on 3/13/16.
+  */
+class FlinkTwitterPostsPipelineIT extends FlatSpec {
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipelineIT])
+
+  @Test
+  def flinkTwitterPostsPipelineIT = {
+
+    val testConfig : TwitterPostsPipelineConfiguration =
+      new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
+    source.setPath("target/test-classes")
+    testConfig.setSource(source);
+    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterPostsPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
+    destination.setPath("target/test-classes")
+    testConfig.setDestination(destination)
+    testConfig.setProviderWaitMs(1000l)
+    testConfig.setTest(true)
+
+    val job = new FlinkTwitterPostsPipeline(config = testConfig)
+    val jobThread = new Thread(job)
+    jobThread.start
+    jobThread.join
+
+    eventually (timeout(30 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterPostsPipeline")))
+      assert(
+        Source.fromFile("target/test-classes/FlinkTwitterPostsPipeline", "UTF-8").getLines.size
+          >= 200)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
new file mode 100644
index 0000000..3d21244
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -0,0 +1,56 @@
+package com.peoplepattern.streams.twitter.collection
+
+import java.nio.file.{Files, Paths}
+
+import com.peoplepattern.streams.pipelines.pdb.{TwitterPostsPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.scalatest.FlatSpec
+import org.scalatest._
+import org.scalatest.junit.JUnitRunner
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.io.Source
+import org.scalatest.Ignore
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
+import org.testng.annotations.Test
+
+/**
+  * Created by sblackmon on 3/13/16.
+  */
+class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipelineIT])
+
+  @Test
+  def flinkTwitterUserInformationPipelineIT = {
+
+    val testConfig : TwitterUserInformationPipelineConfiguration =
+      new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("1000twitterids.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
+    source.setPath("target/test-classes")
+    testConfig.setSource(source);
+    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/TwitterUserInformationPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
+    destination.setPath("target/test-classes")
+    testConfig.setDestination(destination)
+    testConfig.setProviderWaitMs(1000l)
+    testConfig.setTest(true)
+
+    val job = new FlinkTwitterUserInformationPipeline(config = testConfig)
+    val jobThread = new Thread(job)
+    jobThread.start
+    jobThread.join
+
+    eventually (timeout(30 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get("target/test-classes/TwitterUserInformationPipeline")))
+      assert(
+        Source.fromFile("target/test-classes/TwitterUserInformationPipeline", "UTF-8").getLines.size
+          > 500)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
new file mode 100644
index 0000000..7054e89
--- /dev/null
+++ b/flink/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>streams-examples</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.4-incubating-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>streams-examples-flink</artifactId>
+
+    <packaging>pom</packaging>
+    <name>streams-examples-flink</name>
+
+    <description>Contributed examples of use cases for Streams using flink</description>
+
+    <properties>
+
+    </properties>
+
+    <modules>
+        <module>flink-twitter-collection</module>
+    </modules>
+
+    <build>
+
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5d25974..9984a5b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>streams-master</artifactId>
         <groupId>org.apache.streams</groupId>
-        <version>0.3-incubating</version>
+        <version>0.4-incubating-SNAPSHOT</version>
 	</parent>
 
     <artifactId>streams-examples</artifactId>
@@ -172,6 +172,7 @@
     </properties>
 
     <modules>
+        <module>flink</module>
         <module>local</module>
     </modules>
 
@@ -186,39 +187,39 @@
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-config</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-core</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-filters</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-pojo</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-pojo</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
                 <type>test-jar</type>
                 <scope>test</scope>
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-util</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-util</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
                 <type>test-jar</type>
                 <scope>test</scope>
             </dependency>
@@ -230,32 +231,32 @@
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-runtime-local</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-provider-twitter</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-persist-elasticsearch</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-persist-graph</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-persist-hdfs</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
                 <artifactId>streams-persist-mongo</artifactId>
-                <version>0.3-incubating</version>
+                <version>0.4-incubating-SNAPSHOT</version>
             </dependency>
 
         </dependencies>



[3/6] incubator-streams-examples git commit: flink examples building and running

Posted by sb...@apache.org.
flink examples building and running


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/0112a838
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/0112a838
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/0112a838

Branch: refs/heads/master
Commit: 0112a83874bb7f896a4e3964d5fde75e5967afe6
Parents: 4491cfe
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Sep 29 19:15:39 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Sep 29 19:15:39 2016 -0500

----------------------------------------------------------------------
 flink/flink-twitter-collection/pom.xml          | 127 +++++++++++------
 .../TwitterSpritzerPipelineConfiguration.json   |  29 ++++
 .../FlinkTwitterFollowingPipeline.scala         |  11 +-
 .../collection/FlinkTwitterPostsPipeline.scala  |  22 ++-
 .../FlinkTwitterSpritzerPipeline.scala          | 138 +++++++++++++++++++
 .../FlinkTwitterUserInformationPipeline.scala   |  28 ++--
 .../TwitterSpritzerPipelineConfiguration.json   |  29 ++++
 .../FlinkTwitterFollowingPipeline.conf          |  10 --
 ...linkTwitterFollowingPipelineFollowersIT.conf |  16 +++
 .../FlinkTwitterFollowingPipelineFriendsIT.conf |  16 +++
 .../resources/FlinkTwitterPostsPipeline.conf    |  10 --
 .../resources/FlinkTwitterPostsPipelineIT.conf  |  15 ++
 .../FlinkTwitterUserInformationPipeline.conf    |  10 --
 .../FlinkTwitterUserInformationPipelineIT.conf  |  15 ++
 .../test/FlinkTwitterFollowingPipelineIT.scala  |  71 +++++-----
 .../test/FlinkTwitterPostsPipelineIT.scala      |  38 ++---
 .../test/FlinkTwitterSpritzerPipelineIT.scala   |  57 ++++++++
 .../FlinkTwitterUserInformationPipelineIT.scala |  33 +++--
 flink/pom.xml                                   |   3 -
 local/elasticsearch-hdfs/pom.xml                |  14 +-
 local/elasticsearch-reindex/pom.xml             |   2 +-
 local/mongo-elasticsearch-sync/pom.xml          |  12 +-
 local/twitter-follow-graph/pom.xml              |  10 +-
 local/twitter-history-elasticsearch/pom.xml     |  14 +-
 local/twitter-userstream-elasticsearch/pom.xml  |  14 +-
 25 files changed, 543 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/pom.xml
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
index 33b05fe..2d35035 100644
--- a/flink/flink-twitter-collection/pom.xml
+++ b/flink/flink-twitter-collection/pom.xml
@@ -34,17 +34,38 @@
     <description>Collects twitter documents using flink.</description>
 
     <properties>
-        <docker.repo>apachestreams</docker.repo>
+        <testng.version>6.9.10</testng.version>
         <hdfs.version>2.7.0</hdfs.version>
         <flink.version>1.1.2</flink.version>
+        <scala.version>2.10.6</scala.version>
+        <scalatest.version>2.2.5</scalatest.version>
         <scala.suffix>2.10</scala.suffix>
+        <scala-maven.plugin.version>3.2.2</scala-maven.plugin.version>
     </properties>
 
     <dependencies>
         <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.suffix}</artifactId>
+            <version>${scalatest.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -65,13 +86,11 @@
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
             <version>${project.version}</version>
-            <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
             <version>${project.version}</version>
-            <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
@@ -277,6 +296,19 @@
             <artifactId>logback-core</artifactId>
             <version>${logback.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>${testng.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -293,17 +325,6 @@
             </testResource>
         </testResources>
         <plugins>
-            <plugin>
-                <artifactId>maven-clean-plugin</artifactId>
-                <configuration>
-                    <filesets>
-                        <fileset>
-                            <directory>data</directory>
-                            <followSymlinks>false</followSymlinks>
-                        </fileset>
-                    </filesets>
-                </configuration>
-            </plugin>
             <!-- This binary runs with logback -->
             <!-- Keep log4j out -->
             <plugin>
@@ -334,8 +355,56 @@
             </executions>
         </plugin>
         <plugin>
+            <groupId>net.alchim31.maven</groupId>
+            <artifactId>scala-maven-plugin</artifactId>
+            <version>${scala-maven.plugin.version}</version>
+            <executions>
+                <execution>
+                    <id>scala-compile-first</id>
+                    <phase>process-resources</phase>
+                    <goals>
+                        <goal>add-source</goal>
+                        <goal>compile</goal>
+                    </goals>
+                </execution>
+                <execution>
+                    <id>scala-test-compile</id>
+                    <phase>process-test-resources</phase>
+                    <goals>
+                        <goal>testCompile</goal>
+                    </goals>
+                </execution>
+            </executions>
+        </plugin>
+        <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-shade-plugin</artifactId>
+            <executions>
+                <execution>
+                    <phase>package</phase>
+                    <goals>
+                        <goal>shade</goal>
+                    </goals>
+                    <configuration>
+                        <finalName>${project.build.finalName}</finalName>
+                        <filters>
+                            <filter>
+                                <artifact>*:*</artifact>
+                                <excludes>
+                                    <exclude>META-INF/*.SF</exclude>
+                                    <exclude>META-INF/*.DSA</exclude>
+                                    <exclude>META-INF/*.RSA</exclude>
+                                    <exclude>**/logback.xml</exclude>
+                                    <exclude>**/log4j.properties</exclude>
+                                </excludes>
+                            </filter>
+                        </filters>
+                        <transformers>
+                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                        </transformers>
+                    </configuration>
+                </execution>
+            </executions>
         </plugin>
         <plugin>
             <groupId>org.jsonschema2pojo</groupId>
@@ -348,7 +417,6 @@
                     <sourcePath>src/main/jsonschema</sourcePath>
                 </sourcePaths>
                 <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
-                <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
                 <useJodaDates>false</useJodaDates>
             </configuration>
             <executions>
@@ -379,25 +447,6 @@
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <configuration>
-                    <includes>**/*.json</includes>
-                    <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
-                    <includeGroupIds>org.apache.streams</includeGroupIds>
-                    <includeTypes>test-jar</includeTypes>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>test-resource-dependencies</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>unpack-dependencies</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-failsafe-plugin</artifactId>
                 <version>2.12.4</version>
                 <executions>
@@ -410,10 +459,6 @@
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <groupId>io.fabric8</groupId>
-                <artifactId>docker-maven-plugin</artifactId>
-            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
new file mode 100644
index 0000000..49d0d1e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 2ac7d32..2fd9336 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -3,6 +3,7 @@ package org.apache.streams.examples.flink.twitter.collection
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.flink.api.common.functions.RichFlatMapFunction
 import org.apache.flink.core.fs.FileSystem
@@ -17,10 +18,10 @@ import org.apache.streams.twitter.TwitterFollowingConfiguration
 import org.apache.streams.twitter.pojo.Follow
 import org.apache.streams.twitter.provider.TwitterFollowingProvider
 import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
 import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+import org.apache.flink.api.scala._
 
 import scala.collection.JavaConversions._
 
@@ -75,6 +76,12 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
             return false
         }
 
+        Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
         return true
 
     }
@@ -134,7 +141,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
         def collectConnections(id : String, out : Collector[StreamsDatum]) = {
             val twitProvider: TwitterFollowingProvider =
                 new TwitterFollowingProvider(
-                    twitterConfiguration.withIdsOnly(true).withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
+                    twitterConfiguration.withIdsOnly(true).withInfo(List(toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
                 )
             twitProvider.prepare(twitProvider)
             twitProvider.startStream()

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index f8e221c..beea973 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -3,11 +3,8 @@ package org.apache.streams.examples.flink.twitter.collection
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
-import com.peoplepattern.streams.pdb.pipelines.FlinkStreamingConfiguration
-import com.peoplepattern.streams.pdb.flink.{FlinkBase, FlinkUtil}
-import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
-import com.peoplepattern.streams.twitter.collection.FlinkTwitterPostsPipeline.LOGGER
 import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
@@ -33,6 +30,7 @@ import org.apache.streams.twitter.TwitterUserInformationConfiguration
 import org.apache.streams.twitter.pojo.{Tweet, User}
 import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
 import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
 
 import scala.collection.JavaConversions._
 
@@ -84,6 +82,12 @@ object FlinkTwitterPostsPipeline extends FlinkBase {
       return false
     }
 
+    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
     return true
 
   }
@@ -105,16 +109,8 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
     val outPath = buildWriterPath(config.getDestination)
 
-    //val inProps = buildKafkaProps(config.getSourceTopic)
-
     val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
 
-    //val idTopicIn = new KafkaSink()
-
-//    val idTopicOut : DataStream[String] = env.addSource[String](
-//      new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09(config.getSourceTopic.getTopic, new SimpleStringSchema(),
-//        inProps));
-
     val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
 
     // these datums contain 'Tweet' objects
@@ -149,7 +145,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
       val twitterConfiguration = config.getTwitter
       val twitProvider: TwitterTimelineProvider =
         new TwitterTimelineProvider(
-          twitterConfiguration.withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(200l)
+          twitterConfiguration.withInfo(List(toProviderId(id))).withMaxItems(200l)
         )
       twitProvider.prepare(twitProvider)
       twitProvider.startStream()

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
new file mode 100644
index 0000000..b615806
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -0,0 +1,138 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterStreamConfiguration
+import org.apache.streams.twitter.provider.TwitterStreamProvider
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
+
+import scala.collection.JavaConversions._
+
+/**
+  * Created by sblackmon on 7/29/15.
+  */
+object FlinkTwitterSpritzerPipeline extends FlinkBase {
+
+  val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline"
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipeline])
+  private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+  override def main(args: Array[String]) = {
+    super.main(args)
+    val jobConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe)
+    if( setup(jobConfig) == false ) System.exit(1)
+    val pipeline: FlinkTwitterSpritzerPipeline = new FlinkTwitterSpritzerPipeline(jobConfig)
+    val thread = new Thread(pipeline)
+    thread.start()
+    thread.join()
+  }
+
+  def setup(jobConfig: TwitterSpritzerPipelineConfiguration): Boolean =  {
+
+    LOGGER.info("TwitterSpritzerPipelineConfiguration: " + jobConfig)
+
+    if( jobConfig == null ) {
+      LOGGER.error("jobConfig is null!")
+      System.err.println("jobConfig is null!")
+      return false
+    }
+
+    if( jobConfig.getDestination == null ) {
+      LOGGER.error("jobConfig.getDestination is null!")
+      System.err.println("jobConfig.getDestination is null!")
+      return false
+    }
+
+    if( jobConfig.getTwitter == null ) {
+      LOGGER.error("jobConfig.getTwitter is null!")
+      System.err.println("jobConfig.getTwitter is null!")
+      return false
+    }
+
+    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
+    return true
+
+  }
+
+}
+
+class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+  import FlinkTwitterSpritzerPipeline._
+
+  override def run(): Unit = {
+
+    val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+    env.setNumberOfExecutionRetries(0)
+
+    val outPath = buildWriterPath(config.getDestination)
+
+    val streamSource : DataStream[String] = env.addSource(new SpritzerSource(config.getTwitter));
+
+    if( config.getTest == false )
+      streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+    else
+      streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+        .setParallelism(env.getParallelism);
+
+    // if( test == true ) jsons.print();
+
+    env.execute("FlinkTwitterPostsPipeline")
+  }
+
+  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable {
+
+    var twitProvider: TwitterStreamProvider = _
+
+    @throws[Exception]
+    override def open(parameters: Configuration): Unit = {
+      twitProvider = new TwitterStreamProvider( sourceConfig )
+      twitProvider.prepare(twitProvider)
+      twitProvider.startStream()
+    }
+
+    override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
+      var iterator: Iterator[StreamsDatum] = null
+      do {
+        Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+        iterator = twitProvider.readCurrent().iterator()
+        iterator.toList.map(datum => ctx.collect(datum.getDocument.asInstanceOf[String]))
+      } while( twitProvider.isRunning )
+    }
+
+    override def cancel(): Unit = {
+      twitProvider.cleanUp()
+    }
+
+    @throws[Exception]
+    override def close(): Unit = {
+      twitProvider.cleanUp()
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index a081c74..867255d 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -1,28 +1,18 @@
 package org.apache.streams.examples.flink.twitter.collection
 
-import java.lang
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
-
-import scala.collection.JavaConversions._
-import com.peoplepattern.streams.twitter.collection.FlinkTwitterUserInformationPipeline.LOGGER
 import com.google.common.util.concurrent.Uninterruptibles
-import org.apache.streams.examples.flink.FlinkBase
 import org.apache.flink.api.common.functions.RichFlatMapFunction
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers._
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
 import org.apache.flink.streaming.connectors.fs.RollingSink
 import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
@@ -30,13 +20,13 @@ import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
 import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.HdfsConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.TwitterUserInformationConfiguration
-import org.apache.streams.twitter.pojo.{Tweet, User}
-import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.apache.streams.twitter.pojo.User
+import org.apache.streams.twitter.provider.TwitterUserInformationProvider
 import org.slf4j.{Logger, LoggerFactory}
 
+import scala.collection.JavaConversions._
+
 /**
   * Created by sblackmon on 3/15/16.
   */
@@ -85,6 +75,12 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
       return false
     }
 
+    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
     return true
 
   }
@@ -137,7 +133,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
 
   class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
     override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 )
-        out.collect(input.map(id => FlinkUtil.toProviderId(id)).toList)
+        out.collect(input.map(id => toProviderId(id)).toList)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
new file mode 100644
index 0000000..49d0d1e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
deleted file mode 100644
index e74f00c..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
-  endpoint = followers
-  version = 1.1
-  oauth {
-    consumerKey = ""
-    consumerSecret = ""
-    accessToken = ""
-    accessTokenSecret = ""
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
new file mode 100644
index 0000000..87057be
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -0,0 +1,16 @@
+source {
+  fields = ["ID"]
+  scheme = file
+  path = "target/test-classes"
+  readerPath = "asf.txt"
+}
+destination {
+  fields = ["DOC"]
+  scheme = file
+  path = "target/test-classes"
+  writerPath = "FlinkTwitterFollowingPipelineFollowersIT"
+}
+twitter.endpoint = friends
+providerWaitMs = 1000
+local = true
+test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
new file mode 100644
index 0000000..b5212ed
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -0,0 +1,16 @@
+source {
+  fields = ["ID"]
+  scheme = file
+  path = "target/test-classes"
+  readerPath = "asf.txt"
+}
+destination {
+  fields = ["DOC"]
+  scheme = file
+  path = "target/test-classes"
+  writerPath = "FlinkTwitterFollowingPipelineFriendsIT"
+}
+twitter.endpoint = friends
+providerWaitMs = 1000
+local = true
+test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
deleted file mode 100644
index 63a6481..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
-  version = 1.1
-  endpoint = statuses
-  oauth {
-    consumerKey = ""
-    consumerSecret = ""
-    accessToken = ""
-    accessTokenSecret = ""
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
new file mode 100644
index 0000000..6954113
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
@@ -0,0 +1,15 @@
+source {
+  fields = ["ID"]
+  scheme = file
+  path = "target/test-classes"
+  readerPath = "asf.txt"
+}
+destination {
+  fields = ["DOC"]
+  scheme = file
+  path = "target/test-classes"
+  writerPath = "FlinkTwitterPostsPipelineIT"
+}
+providerWaitMs = 1000
+local = true
+test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
deleted file mode 100644
index 6e0a879..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
-  version = 1.1
-  endpoint = users
-  oauth {
-    consumerKey = ""
-    consumerSecret = ""
-    accessToken = ""
-    accessTokenSecret = ""
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
new file mode 100644
index 0000000..342a850
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -0,0 +1,15 @@
+source {
+  fields = ["ID"]
+  scheme = file
+  path = "target/test-classes"
+  readerPath = "asf.txt"
+}
+destination {
+  fields = ["DOC"]
+  scheme = file
+  path = "target/test-classes"
+  writerPath = "FlinkTwitterUserInformationPipelineIT"
+}
+providerWaitMs = 1000
+local = true
+test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
index aa2b1a9..b051e90 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
@@ -1,17 +1,22 @@
 package com.peoplepattern.streams.twitter.collection
 
+import java.io.File
 import java.nio.file.{Files, Paths}
 
-import com.peoplepattern.streams.pipelines.pdb.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
 import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.SpanSugar._
 import org.slf4j.{Logger, LoggerFactory}
 import org.testng.annotations.Test
 
 import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
 
 /**
   * Created by sblackmon on 3/13/16.
@@ -20,30 +25,31 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT])
 
+  import FlinkTwitterFollowingPipeline._
+
   @Test
   def flinkTwitterFollowersPipelineFriendsIT = {
 
-    val testConfig : TwitterFollowingPipelineConfiguration =
-      new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
-    testConfig.getTwitter.setEndpoint("friends")
-    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
-    source.setPath("target/test-classes")
-    testConfig.setSource(source);
-    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
-    destination.setPath("target/test-classes")
-    testConfig.setDestination(destination)
-    testConfig.setProviderWaitMs(1000l)
-    testConfig.setTest(true)
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
 
     val job = new FlinkTwitterFollowingPipeline(config = testConfig)
     val jobThread = new Thread(job)
     jobThread.start
     jobThread.join
 
-    eventually (timeout(30 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends")))
+    eventually (timeout(60 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
       assert(
-        Source.fromFile("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends", "UTF-8").getLines.size
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
           > 90)
     }
 
@@ -52,27 +58,26 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
   @Test
   def flinkTwitterFollowersPipelineFollowersIT = {
 
-    val testConfig : TwitterFollowingPipelineConfiguration =
-      new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
-    testConfig.getTwitter.setEndpoint("followers")
-    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
-    source.setPath("target/test-classes")
-    testConfig.setSource(source);
-    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/followers").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
-    destination.setPath("target/test-classes")
-    testConfig.setDestination(destination)
-    testConfig.setProviderWaitMs(1000l)
-    testConfig.setTest(true)
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
 
     val job = new FlinkTwitterFollowingPipeline(config = testConfig)
     val jobThread = new Thread(job)
     jobThread.start
     jobThread.join
 
-    eventually (timeout(30 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterFollowingPipeline/followers")))
+    eventually (timeout(60 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
       assert(
-        Source.fromFile("target/test-classes/FlinkTwitterFollowingPipeline/followers", "UTF-8").getLines.size
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
           > 500)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index 8a942e5..a355696 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -1,12 +1,16 @@
 package com.peoplepattern.streams.twitter.collection
 
+import java.io.File
 import java.nio.file.{Files, Paths}
 import java.util.concurrent.TimeUnit
 
 import com.google.common.util.concurrent.{Monitor, Uninterruptibles}
-import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.examples.flink.twitter.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterPostsPipeline, FlinkTwitterUserInformationPipeline}
 import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
 import org.slf4j.{Logger, LoggerFactory}
 
@@ -20,23 +24,25 @@ import org.testng.annotations.Test
 /**
   * Created by sblackmon on 3/13/16.
   */
-class FlinkTwitterPostsPipelineIT extends FlatSpec {
+class FlinkTwitterPostsPipelineIT extends FlatSpec  {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipelineIT])
 
+  import FlinkTwitterPostsPipeline._
+
   @Test
   def flinkTwitterPostsPipelineIT = {
 
-    val testConfig : TwitterPostsPipelineConfiguration =
-      new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
-    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
-    source.setPath("target/test-classes")
-    testConfig.setSource(source);
-    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterPostsPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
-    destination.setPath("target/test-classes")
-    testConfig.setDestination(destination)
-    testConfig.setProviderWaitMs(1000l)
-    testConfig.setTest(true)
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterPostsPipelineIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
 
     val job = new FlinkTwitterPostsPipeline(config = testConfig)
     val jobThread = new Thread(job)
@@ -44,9 +50,9 @@ class FlinkTwitterPostsPipelineIT extends FlatSpec {
     jobThread.join
 
     eventually (timeout(30 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterPostsPipeline")))
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
       assert(
-        Source.fromFile("target/test-classes/FlinkTwitterPostsPipeline", "UTF-8").getLines.size
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
           >= 200)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
new file mode 100644
index 0000000..f083f65
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -0,0 +1,57 @@
+package org.apache.streams.examples.flink.twitter.test
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterSpritzerPipeline, FlinkTwitterUserInformationPipeline}
+import org.apache.streams.examples.flink.twitter.{TwitterPostsPipelineConfiguration, TwitterSpritzerPipelineConfiguration}
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
+
+/**
+  * Created by sblackmon on 3/13/16.
+  */
+class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterSpritzerPipelineIT])
+
+  import FlinkTwitterSpritzerPipeline._
+
+  @Test(enabled = false)
+  def flinkTwitterSpritzerPipelineIT = {
+
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterSpritzerPipelineIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
+
+    val job = new FlinkTwitterSpritzerPipeline(config = testConfig)
+    val jobThread = new Thread(job)
+    jobThread.start
+    jobThread.join
+
+    eventually (timeout(30 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
+      assert(
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+          >= 200)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index 3d21244..2ca8650 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -1,10 +1,13 @@
 package com.peoplepattern.streams.twitter.collection
 
+import java.io.File
 import java.nio.file.{Files, Paths}
 
-import com.peoplepattern.streams.pipelines.pdb.{TwitterPostsPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.examples.flink.twitter.{TwitterSpritzerPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
 import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
 import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
 import org.scalatest.FlatSpec
 import org.scalatest._
@@ -25,19 +28,21 @@ class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipelineIT])
 
+  import FlinkTwitterUserInformationPipeline._
+
   @Test
   def flinkTwitterUserInformationPipelineIT = {
 
-    val testConfig : TwitterUserInformationPipelineConfiguration =
-      new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
-    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("1000twitterids.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
-    source.setPath("target/test-classes")
-    testConfig.setSource(source);
-    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/TwitterUserInformationPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
-    destination.setPath("target/test-classes")
-    testConfig.setDestination(destination)
-    testConfig.setProviderWaitMs(1000l)
-    testConfig.setTest(true)
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterUserInformationPipelineIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
 
     val job = new FlinkTwitterUserInformationPipeline(config = testConfig)
     val jobThread = new Thread(job)
@@ -45,9 +50,9 @@ class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
     jobThread.join
 
     eventually (timeout(30 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get("target/test-classes/TwitterUserInformationPipeline")))
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
       assert(
-        Source.fromFile("target/test-classes/TwitterUserInformationPipeline", "UTF-8").getLines.size
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
           > 500)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 7054e89..6c50ca2 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -41,7 +41,4 @@
         <module>flink-twitter-collection</module>
     </modules>
 
-    <build>
-
-    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/elasticsearch-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/pom.xml b/local/elasticsearch-hdfs/pom.xml
index 7b653fc..52cd0fc 100644
--- a/local/elasticsearch-hdfs/pom.xml
+++ b/local/elasticsearch-hdfs/pom.xml
@@ -67,7 +67,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>com.typesafe</groupId>
@@ -76,34 +76,34 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-runtime-local</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-elasticsearch</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-hdfs</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/elasticsearch-reindex/pom.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/pom.xml b/local/elasticsearch-reindex/pom.xml
index e81cbe2..325e564 100644
--- a/local/elasticsearch-reindex/pom.xml
+++ b/local/elasticsearch-reindex/pom.xml
@@ -92,7 +92,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-elasticsearch</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/mongo-elasticsearch-sync/pom.xml
----------------------------------------------------------------------
diff --git a/local/mongo-elasticsearch-sync/pom.xml b/local/mongo-elasticsearch-sync/pom.xml
index 318c47e..d268ed7 100644
--- a/local/mongo-elasticsearch-sync/pom.xml
+++ b/local/mongo-elasticsearch-sync/pom.xml
@@ -66,7 +66,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>com.typesafe</groupId>
@@ -75,27 +75,27 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-runtime-local</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-elasticsearch</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-mongo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-follow-graph/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-follow-graph/pom.xml b/local/twitter-follow-graph/pom.xml
index d40adde..9bf980d 100644
--- a/local/twitter-follow-graph/pom.xml
+++ b/local/twitter-follow-graph/pom.xml
@@ -49,17 +49,17 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-runtime-local</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-provider-twitter</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <exclusions>
                 <exclusion>
                     <groupId>commons-logging</groupId>
@@ -70,12 +70,12 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-graph</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-history-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/pom.xml b/local/twitter-history-elasticsearch/pom.xml
index afc8cf0..ba6dbe7 100644
--- a/local/twitter-history-elasticsearch/pom.xml
+++ b/local/twitter-history-elasticsearch/pom.xml
@@ -69,7 +69,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>com.typesafe</groupId>
@@ -78,29 +78,29 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-runtime-local</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-provider-twitter</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <exclusions>
                 <exclusion>
                     <groupId>commons-logging</groupId>
@@ -111,7 +111,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-elasticsearch</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-userstream-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/pom.xml b/local/twitter-userstream-elasticsearch/pom.xml
index 224bdd4..1b7b64f 100644
--- a/local/twitter-userstream-elasticsearch/pom.xml
+++ b/local/twitter-userstream-elasticsearch/pom.xml
@@ -67,7 +67,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>com.typesafe</groupId>
@@ -76,32 +76,32 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-runtime-local</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-filters</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-provider-twitter</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-elasticsearch</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>


[4/6] incubator-streams-examples git commit: improve documentation

Posted by sb...@apache.org.
improve documentation


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/58fefc07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/58fefc07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/58fefc07

Branch: refs/heads/master
Commit: 58fefc07fc09b45f0e4cfebc9b126cab1fa8a9a3
Parents: 0112a83
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Sep 29 22:02:14 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Sep 29 22:02:14 2016 -0500

----------------------------------------------------------------------
 .../resources/FlinkTwitterFollowingPipeline.dot | 37 ++++++++++++
 .../FlinkTwitterFollowingPipeline.dot.svg       | 63 ++++++++++++++++++++
 .../resources/FlinkTwitterPostsPipeline.dot     | 37 ++++++++++++
 .../resources/FlinkTwitterPostsPipeline.dot.svg | 63 ++++++++++++++++++++
 .../resources/FlinkTwitterSpritzerPipeline.dot  | 33 ++++++++++
 .../FlinkTwitterSpritzerPipeline.dot.svg        | 47 +++++++++++++++
 .../FlinkTwitterUserInformationPipeline.dot     | 37 ++++++++++++
 .../FlinkTwitterUserInformationPipeline.dot.svg | 63 ++++++++++++++++++++
 .../markdown/FlinkTwitterFollowingPipeline.md   |  4 +-
 .../site/markdown/FlinkTwitterPostsPipeline.md  |  2 +-
 .../markdown/FlinkTwitterSpritzerPipeline.md    | 41 +++++++++++++
 .../FlinkTwitterUserInformationPipeline.md      |  4 +-
 .../src/site/markdown/index.md                  | 16 ++++-
 .../test/FlinkTwitterFollowingPipelineIT.scala  |  4 +-
 14 files changed, 444 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot
new file mode 100644
index 0000000..ba5e60d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterFollowingProvider -> source [dir=back,style=dashed];
+  TwitterFollowingProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot.svg
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot.svg b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot.svg
new file mode 100644
index 0000000..79bee38
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterFollowingPipeline.dot.svg
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
+ "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<!-- Generated by graphviz version 2.39.20160214.2135 (20160214.2135)
+ -->
+<!-- Title: g Pages: 1 -->
+<svg width="504pt" height="203pt"
+ viewBox="0.00 0.00 504.29 203.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
+<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 199)">
+<title>g</title>
+<polygon fill="white" stroke="transparent" points="-4,4 -4,-199 500.286,-199 500.286,4 -4,4"/>
+<!-- source -->
+<g id="node1" class="node">
+<title>source</title>
+<polygon fill="none" stroke="black" points="275.082,-109 11.9727,-109 11.9727,-113 -0.0273492,-113 -0.0273492,-73 275.082,-73 275.082,-109"/>
+<polyline fill="none" stroke="black" points="-0.0273492,-109 11.9727,-109 "/>
+<text text-anchor="middle" x="137.527" y="-93.8" font-family="Times,serif" font-size="14.00">source</text>
+<text text-anchor="middle" x="137.527" y="-79.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${readerPath}</text>
+</g>
+<!-- TwitterFollowingProvider -->
+<g id="node2" class="node">
+<title>TwitterFollowingProvider</title>
+<g id="a_node2"><a xlink:href="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java" xlink:title="TwitterFollowingProvider">
+<ellipse fill="none" stroke="black" cx="200.527" cy="-177" rx="102.174" ry="18"/>
+<text text-anchor="middle" x="200.527" y="-172.8" font-family="Times,serif" font-size="14.00">TwitterFollowingProvider</text>
+</a>
+</g>
+</g>
+<!-- TwitterFollowingProvider&#45;&gt;source -->
+<g id="edge1" class="edge">
+<title>TwitterFollowingProvider&#45;&gt;source</title>
+<path fill="none" stroke="black" stroke-dasharray="5,2" d="M181.043,-150.402C171.25,-137.034 159.635,-121.178 150.869,-109.212"/>
+<polygon fill="black" stroke="black" points="178.434,-152.763 187.167,-158.762 184.081,-148.626 178.434,-152.763"/>
+</g>
+<!-- RollingFileSink -->
+<g id="node3" class="node">
+<title>RollingFileSink</title>
+<ellipse fill="none" stroke="black" cx="359.527" cy="-91" rx="66.7358" ry="18"/>
+<text text-anchor="middle" x="359.527" y="-86.8" font-family="Times,serif" font-size="14.00">RollingFileSink</text>
+</g>
+<!-- TwitterFollowingProvider&#45;&gt;RollingFileSink -->
+<g id="edge2" class="edge">
+<title>TwitterFollowingProvider&#45;&gt;RollingFileSink</title>
+<path fill="none" stroke="black" d="M232.321,-159.803C257.806,-146.019 293.67,-126.621 320.824,-111.934"/>
+<polygon fill="black" stroke="black" points="322.54,-114.985 329.671,-107.149 319.21,-108.828 322.54,-114.985"/>
+<text text-anchor="middle" x="306.641" y="-129.8" font-family="Times,serif" font-size="14.00">String</text>
+</g>
+<!-- destination -->
+<g id="node4" class="node">
+<title>destination</title>
+<polygon fill="none" stroke="black" points="496.044,-36 235.01,-36 235.01,-40 223.01,-40 223.01,-0 496.044,-0 496.044,-36"/>
+<polyline fill="none" stroke="black" points="223.01,-36 235.01,-36 "/>
+<text text-anchor="middle" x="359.527" y="-20.8" font-family="Times,serif" font-size="14.00">destination</text>
+<text text-anchor="middle" x="359.527" y="-6.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${writerPath}</text>
+</g>
+<!-- RollingFileSink&#45;&gt;destination -->
+<g id="edge3" class="edge">
+<title>RollingFileSink&#45;&gt;destination</title>
+<path fill="none" stroke="black" d="M359.527,-72.9551C359.527,-64.8828 359.527,-55.1764 359.527,-46.1817"/>
+<polygon fill="black" stroke="black" points="363.027,-46.0903 359.527,-36.0904 356.027,-46.0904 363.027,-46.0903"/>
+</g>
+</g>
+</svg>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot
new file mode 100644
index 0000000..1092ff4
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterTimelineProvider [label="TwitterTimelineProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterTimelineProvider -> source [dir=back,style=dashed];
+  TwitterTimelineProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot.svg
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot.svg b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot.svg
new file mode 100644
index 0000000..5698c45
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterPostsPipeline.dot.svg
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
+ "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<!-- Generated by graphviz version 2.39.20160214.2135 (20160214.2135)
+ -->
+<!-- Title: g Pages: 1 -->
+<svg width="504pt" height="203pt"
+ viewBox="0.00 0.00 504.29 203.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
+<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 199)">
+<title>g</title>
+<polygon fill="white" stroke="transparent" points="-4,4 -4,-199 500.286,-199 500.286,4 -4,4"/>
+<!-- source -->
+<g id="node1" class="node">
+<title>source</title>
+<polygon fill="none" stroke="black" points="275.082,-109 11.9727,-109 11.9727,-113 -0.0273492,-113 -0.0273492,-73 275.082,-73 275.082,-109"/>
+<polyline fill="none" stroke="black" points="-0.0273492,-109 11.9727,-109 "/>
+<text text-anchor="middle" x="137.527" y="-93.8" font-family="Times,serif" font-size="14.00">source</text>
+<text text-anchor="middle" x="137.527" y="-79.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${readerPath}</text>
+</g>
+<!-- TwitterTimelineProvider -->
+<g id="node2" class="node">
+<title>TwitterTimelineProvider</title>
+<g id="a_node2"><a xlink:href="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java" xlink:title="TwitterTimelineProvider">
+<ellipse fill="none" stroke="black" cx="200.527" cy="-177" rx="97.6831" ry="18"/>
+<text text-anchor="middle" x="200.527" y="-172.8" font-family="Times,serif" font-size="14.00">TwitterTimelineProvider</text>
+</a>
+</g>
+</g>
+<!-- TwitterTimelineProvider&#45;&gt;source -->
+<g id="edge1" class="edge">
+<title>TwitterTimelineProvider&#45;&gt;source</title>
+<path fill="none" stroke="black" stroke-dasharray="5,2" d="M181.043,-150.402C171.25,-137.034 159.635,-121.178 150.869,-109.212"/>
+<polygon fill="black" stroke="black" points="178.434,-152.763 187.167,-158.762 184.081,-148.626 178.434,-152.763"/>
+</g>
+<!-- RollingFileSink -->
+<g id="node3" class="node">
+<title>RollingFileSink</title>
+<ellipse fill="none" stroke="black" cx="359.527" cy="-91" rx="66.7358" ry="18"/>
+<text text-anchor="middle" x="359.527" y="-86.8" font-family="Times,serif" font-size="14.00">RollingFileSink</text>
+</g>
+<!-- TwitterTimelineProvider&#45;&gt;RollingFileSink -->
+<g id="edge2" class="edge">
+<title>TwitterTimelineProvider&#45;&gt;RollingFileSink</title>
+<path fill="none" stroke="black" d="M232.321,-159.803C257.806,-146.019 293.67,-126.621 320.824,-111.934"/>
+<polygon fill="black" stroke="black" points="322.54,-114.985 329.671,-107.149 319.21,-108.828 322.54,-114.985"/>
+<text text-anchor="middle" x="306.641" y="-129.8" font-family="Times,serif" font-size="14.00">String</text>
+</g>
+<!-- destination -->
+<g id="node4" class="node">
+<title>destination</title>
+<polygon fill="none" stroke="black" points="496.044,-36 235.01,-36 235.01,-40 223.01,-40 223.01,-0 496.044,-0 496.044,-36"/>
+<polyline fill="none" stroke="black" points="223.01,-36 235.01,-36 "/>
+<text text-anchor="middle" x="359.527" y="-20.8" font-family="Times,serif" font-size="14.00">destination</text>
+<text text-anchor="middle" x="359.527" y="-6.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${writerPath}</text>
+</g>
+<!-- RollingFileSink&#45;&gt;destination -->
+<g id="edge3" class="edge">
+<title>RollingFileSink&#45;&gt;destination</title>
+<path fill="none" stroke="black" d="M359.527,-72.9551C359.527,-64.8828 359.527,-55.1764 359.527,-46.1817"/>
+<polygon fill="black" stroke="black" points="363.027,-46.0903 359.527,-36.0904 356.027,-46.0904 363.027,-46.0903"/>
+</g>
+</g>
+</svg>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot
new file mode 100644
index 0000000..5a57595
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //providers
+  TwitterStreamProvider [label="TwitterStreamProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="hdfs://${host}:${port}/${path}/${writerPath}",shape=box];
+
+  //stream
+  TwitterStreamProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot.svg
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot.svg b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot.svg
new file mode 100644
index 0000000..960a11f
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterSpritzerPipeline.dot.svg
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
+ "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<!-- Generated by graphviz version 2.39.20160214.2135 (20160214.2135)
+ -->
+<!-- Title: g Pages: 1 -->
+<svg width="282pt" height="203pt"
+ viewBox="0.00 0.00 281.52 203.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
+<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 199)">
+<title>g</title>
+<polygon fill="white" stroke="transparent" points="-4,4 -4,-199 277.517,-199 277.517,4 -4,4"/>
+<!-- TwitterStreamProvider -->
+<g id="node1" class="node">
+<title>TwitterStreamProvider</title>
+<g id="a_node1"><a xlink:href="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java" xlink:title="TwitterStreamProvider">
+<ellipse fill="none" stroke="black" cx="136.758" cy="-177" rx="91.0473" ry="18"/>
+<text text-anchor="middle" x="136.758" y="-172.8" font-family="Times,serif" font-size="14.00">TwitterStreamProvider</text>
+</a>
+</g>
+</g>
+<!-- RollingFileSink -->
+<g id="node2" class="node">
+<title>RollingFileSink</title>
+<ellipse fill="none" stroke="black" cx="136.758" cy="-91" rx="66.7358" ry="18"/>
+<text text-anchor="middle" x="136.758" y="-86.8" font-family="Times,serif" font-size="14.00">RollingFileSink</text>
+</g>
+<!-- TwitterStreamProvider&#45;&gt;RollingFileSink -->
+<g id="edge1" class="edge">
+<title>TwitterStreamProvider&#45;&gt;RollingFileSink</title>
+<path fill="none" stroke="black" d="M136.758,-158.762C136.758,-147.36 136.758,-132.434 136.758,-119.494"/>
+<polygon fill="black" stroke="black" points="140.258,-119.212 136.758,-109.212 133.258,-119.212 140.258,-119.212"/>
+<text text-anchor="middle" x="153.872" y="-129.8" font-family="Times,serif" font-size="14.00">String</text>
+</g>
+<!-- destination -->
+<g id="node3" class="node">
+<title>destination</title>
+<polygon fill="none" stroke="black" points="273.275,-36 0.241273,-36 0.241273,-0 273.275,-0 273.275,-36"/>
+<text text-anchor="middle" x="136.758" y="-13.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${writerPath}</text>
+</g>
+<!-- RollingFileSink&#45;&gt;destination -->
+<g id="edge2" class="edge">
+<title>RollingFileSink&#45;&gt;destination</title>
+<path fill="none" stroke="black" d="M136.758,-72.9551C136.758,-64.8828 136.758,-55.1764 136.758,-46.1817"/>
+<polygon fill="black" stroke="black" points="140.258,-46.0903 136.758,-36.0904 133.258,-46.0904 140.258,-46.0903"/>
+</g>
+</g>
+</svg>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot
new file mode 100644
index 0000000..4a37234
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterUserInformationProvider [label="TwitterUserInformationProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterUserInformationProvider -> source [dir=back,style=dashed];
+  TwitterUserInformationProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot.svg
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot.svg b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot.svg
new file mode 100644
index 0000000..9dadc63
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/resources/FlinkTwitterUserInformationPipeline.dot.svg
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
+ "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<!-- Generated by graphviz version 2.39.20160214.2135 (20160214.2135)
+ -->
+<!-- Title: g Pages: 1 -->
+<svg width="504pt" height="203pt"
+ viewBox="0.00 0.00 504.29 203.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
+<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 199)">
+<title>g</title>
+<polygon fill="white" stroke="transparent" points="-4,4 -4,-199 500.286,-199 500.286,4 -4,4"/>
+<!-- source -->
+<g id="node1" class="node">
+<title>source</title>
+<polygon fill="none" stroke="black" points="275.082,-109 11.9727,-109 11.9727,-113 -0.0273492,-113 -0.0273492,-73 275.082,-73 275.082,-109"/>
+<polyline fill="none" stroke="black" points="-0.0273492,-109 11.9727,-109 "/>
+<text text-anchor="middle" x="137.527" y="-93.8" font-family="Times,serif" font-size="14.00">source</text>
+<text text-anchor="middle" x="137.527" y="-79.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${readerPath}</text>
+</g>
+<!-- TwitterUserInformationProvider -->
+<g id="node2" class="node">
+<title>TwitterUserInformationProvider</title>
+<g id="a_node2"><a xlink:href="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java" xlink:title="TwitterUserInformationProvider">
+<ellipse fill="none" stroke="black" cx="200.527" cy="-177" rx="124.824" ry="18"/>
+<text text-anchor="middle" x="200.527" y="-172.8" font-family="Times,serif" font-size="14.00">TwitterUserInformationProvider</text>
+</a>
+</g>
+</g>
+<!-- TwitterUserInformationProvider&#45;&gt;source -->
+<g id="edge1" class="edge">
+<title>TwitterUserInformationProvider&#45;&gt;source</title>
+<path fill="none" stroke="black" stroke-dasharray="5,2" d="M181.043,-150.402C171.25,-137.034 159.635,-121.178 150.869,-109.212"/>
+<polygon fill="black" stroke="black" points="178.434,-152.763 187.167,-158.762 184.081,-148.626 178.434,-152.763"/>
+</g>
+<!-- RollingFileSink -->
+<g id="node3" class="node">
+<title>RollingFileSink</title>
+<ellipse fill="none" stroke="black" cx="359.527" cy="-91" rx="66.7358" ry="18"/>
+<text text-anchor="middle" x="359.527" y="-86.8" font-family="Times,serif" font-size="14.00">RollingFileSink</text>
+</g>
+<!-- TwitterUserInformationProvider&#45;&gt;RollingFileSink -->
+<g id="edge2" class="edge">
+<title>TwitterUserInformationProvider&#45;&gt;RollingFileSink</title>
+<path fill="none" stroke="black" d="M232.703,-159.597C258.14,-145.839 293.738,-126.584 320.747,-111.975"/>
+<polygon fill="black" stroke="black" points="322.42,-115.05 329.551,-107.214 319.09,-108.893 322.42,-115.05"/>
+<text text-anchor="middle" x="306.641" y="-129.8" font-family="Times,serif" font-size="14.00">String</text>
+</g>
+<!-- destination -->
+<g id="node4" class="node">
+<title>destination</title>
+<polygon fill="none" stroke="black" points="496.044,-36 235.01,-36 235.01,-40 223.01,-40 223.01,-0 496.044,-0 496.044,-36"/>
+<polyline fill="none" stroke="black" points="223.01,-36 235.01,-36 "/>
+<text text-anchor="middle" x="359.527" y="-20.8" font-family="Times,serif" font-size="14.00">destination</text>
+<text text-anchor="middle" x="359.527" y="-6.8" font-family="Times,serif" font-size="14.00">hdfs://${host}:${port}/${path}/${writerPath}</text>
+</g>
+<!-- RollingFileSink&#45;&gt;destination -->
+<g id="edge3" class="edge">
+<title>RollingFileSink&#45;&gt;destination</title>
+<path fill="none" stroke="black" d="M359.527,-72.9551C359.527,-64.8828 359.527,-55.1764 359.527,-46.1817"/>
+<polygon fill="black" stroke="black" points="363.027,-46.0903 359.527,-36.0904 356.027,-46.0904 363.027,-46.0903"/>
+</g>
+</g>
+</svg>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
index 22f30f5..3ad23d3 100644
--- a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
@@ -19,7 +19,9 @@ Diagram:
 Example Configuration:
 ----------------------
 
-[FlinkTwitterFollowingPipeline.json](FlinkTwitterFollowingPipeline.json "FlinkTwitterFollowingPipeline.json" )
+[FlinkTwitterFollowingPipelineFollowersIT.conf](FlinkTwitterFollowingPipelineFollowersIT.conf "FlinkTwitterFollowingPipelineFollowersIT.conf" )
+
+[FlinkTwitterFollowingPipelineFriendsIT.conf](FlinkTwitterFollowingPipelineFriendsIT.conf "FlinkTwitterFollowingPipelineFriendsIT.conf" )
 
 Run (Local):
 ------------

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
index 5f77994..fe6b544 100644
--- a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
@@ -19,7 +19,7 @@ Diagram:
 Example Configuration:
 ----------------------
 
-[FlinkTwitterPostsPipeline.json](FlinkTwitterPostsPipeline.json "FlinkTwitterPostsPipeline.json" )
+[FlinkTwitterPostsPipelineIT.conf](FlinkTwitterPostsPipelineIT.conf "FlinkTwitterPostsPipelineIT.conf" )
 
 Run (Local):
 ------------

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
new file mode 100644
index 0000000..259fe7f
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterSpritzerPipeline
+============================
+
+Description:
+-----------------
+
+Collects twitter posts in real-time from the sample endpoint with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterSpritzerPipeline.dot](FlinkTwitterSpritzerPipeline.dot "FlinkTwitterSpritzerPipeline.dot" )
+
+Diagram:
+-----------------
+
+![FlinkTwitterSpritzerPipeline.dot.svg](./FlinkTwitterSpritzerPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterSpritzerPipelineIT.conf](FlinkTwitterSpritzerPipelineIT.conf "FlinkTwitterSpritzerPipelineIT.conf" )
+
+Run (Local):
+------------
+
+    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
+
+Run (Flink):
+------------
+
+    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> 
+
+Run (YARN):
+-----------
+
+    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> 
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
index 5e0d1fe..a465de9 100644
--- a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
@@ -4,7 +4,7 @@ FlinkTwitterUserInformationPipeline
 Description:
 -----------------
 
-Collects twitter users with flink.
+Collects twitter user profiles with flink.
 
 Specification:
 -----------------
@@ -19,7 +19,7 @@ Diagram:
 Example Configuration:
 ----------------------
 
-[FlinkTwitterUserInformationPipeline.json](FlinkTwitterUserInformationPipeline.json "FlinkTwitterUserInformationPipeline.json" )
+[FlinkTwitterUserInformationPipelineIT.conf](FlinkTwitterUserInformationPipelineIT.conf "FlinkTwitterUserInformationPipelineIT.conf" )
 
 Run (Local):
 ------------

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md
index 19e44cf..0f15603 100644
--- a/flink/flink-twitter-collection/src/site/markdown/index.md
+++ b/flink/flink-twitter-collection/src/site/markdown/index.md
@@ -22,10 +22,24 @@ Streams:
 
 <a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
 
+Test:
+-----
+
+Create a local file `application.conf` with valid twitter credentials
+
+    twitter {
+      oauth {
+        consumerKey = ""
+        consumerSecret = ""
+        accessToken = ""
+        accessTokenSecret = ""
+      }
+    }
+    
 Build:
 ---------
 
-    mvn clean install verify
+    mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/application.conf"
 
 [JavaDocs](apidocs/index.html "JavaDocs")
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/58fefc07/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
index b051e90..e6294f6 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
@@ -27,7 +27,7 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
 
   import FlinkTwitterFollowingPipeline._
 
-  @Test
+  @Test(enabled = false)
   def flinkTwitterFollowersPipelineFriendsIT = {
 
     val reference: Config = ConfigFactory.load()
@@ -55,7 +55,7 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
 
   }
 
-  @Test
+  @Test(enabled = false)
   def flinkTwitterFollowersPipelineFollowersIT = {
 
     val reference: Config = ConfigFactory.load()


[5/6] incubator-streams-examples git commit: all five flink examples passing

Posted by sb...@apache.org.
all five flink examples passing


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/9dcdf645
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/9dcdf645
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/9dcdf645

Branch: refs/heads/master
Commit: 9dcdf645080302d2f8e1bc7dc3d312817d459cf5
Parents: 58fefc0
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 5 16:42:25 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 5 16:42:25 2016 -0500

----------------------------------------------------------------------
 flink/flink-twitter-collection/pom.xml          | 28 ++++---
 .../FlinkTwitterFollowingPipeline.scala         |  2 +-
 .../collection/FlinkTwitterPostsPipeline.scala  |  2 +-
 .../FlinkTwitterSpritzerPipeline.scala          | 28 +++++--
 .../FlinkTwitterUserInformationPipeline.scala   |  2 +-
 .../markdown/FlinkTwitterSpritzerPipeline.md    |  6 +-
 .../src/site/markdown/index.md                  |  6 +-
 .../resources/FlinkTwitterFollowingPipeline.dot | 37 +++++++++
 .../resources/FlinkTwitterPostsPipeline.dot     | 37 +++++++++
 .../resources/FlinkTwitterSpritzerPipeline.dot  | 33 ++++++++
 .../FlinkTwitterUserInformationPipeline.dot     | 37 +++++++++
 ...linkTwitterFollowingPipelineFollowersIT.conf |  6 +-
 .../FlinkTwitterFollowingPipelineFriendsIT.conf |  5 +-
 .../FlinkTwitterSpritzerPipelineIT.conf         | 15 ++++
 .../FlinkTwitterUserInformationPipelineIT.conf  |  2 +-
 ...inkTwitterFollowingPipelineFollowersIT.scala | 55 +++++++++++++
 ...FlinkTwitterFollowingPipelineFriendsIT.scala | 59 ++++++++++++++
 .../test/FlinkTwitterFollowingPipelineIT.scala  | 86 --------------------
 .../test/FlinkTwitterSpritzerPipelineIT.scala   |  9 +-
 19 files changed, 336 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/pom.xml
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
index 2d35035..4cf0b89 100644
--- a/flink/flink-twitter-collection/pom.xml
+++ b/flink/flink-twitter-collection/pom.xml
@@ -448,16 +448,24 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-failsafe-plugin</artifactId>
-                <version>2.12.4</version>
-                <executions>
-                    <execution>
-                        <id>integration-tests</id>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                    </execution>
-                </executions>
+                <configuration>
+                    <!-- Run integration test suite rather than individual tests. -->
+                    <excludes>
+                        <exclude>**/*Test.java</exclude>
+                        <exclude>**/*Tests.java</exclude>
+                    </excludes>
+                    <includes>
+                        <include>**/*IT.java</include>
+                        <include>**/*ITs.java</include>
+                    </includes>
+                </configuration>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.maven.surefire</groupId>
+                        <artifactId>surefire-testng</artifactId>
+                        <version>${failsafe.plugin.version}</version>
+                    </dependency>
+                </dependencies>
             </plugin>
         </plugins>
     </build>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 2fd9336..a20078e 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -126,7 +126,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
 
         // if( test == true ) jsons.print();
 
-        env.execute("FlinkTwitterFollowingPipeline")
+        env.execute(STREAMS_ID)
     }
 
     class FollowingCollectorFlatMapFunction(

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index beea973..bb7d54c 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -134,7 +134,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
     // if( test == true ) jsons.print();
 
-    env.execute("FlinkTwitterPostsPipeline")
+    env.execute(STREAMS_ID)
   }
 
   class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index b615806..d6ed3df 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -1,10 +1,12 @@
 package org.apache.streams.examples.flink.twitter.collection
 
+import java.io.Serializable
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.functions.StoppableFunction
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.streaming.api.TimeCharacteristic
@@ -21,6 +23,7 @@ import org.apache.streams.twitter.TwitterStreamConfiguration
 import org.apache.streams.twitter.provider.TwitterStreamProvider
 import org.slf4j.{Logger, LoggerFactory}
 import org.apache.flink.api.scala._
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat
 
 import scala.collection.JavaConversions._
 
@@ -82,6 +85,8 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
 
   import FlinkTwitterSpritzerPipeline._
 
+  val spritzerSource = new SpritzerSource(config.getTwitter)
+
   override def run(): Unit = {
 
     val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
@@ -91,7 +96,7 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
 
     val outPath = buildWriterPath(config.getDestination)
 
-    val streamSource : DataStream[String] = env.addSource(new SpritzerSource(config.getTwitter));
+    val streamSource : DataStream[String] = env.addSource(spritzerSource);
 
     if( config.getTest == false )
       streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
@@ -101,15 +106,23 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
 
     // if( test == true ) jsons.print();
 
-    env.execute("FlinkTwitterPostsPipeline")
+    env.execute(STREAMS_ID)
+
+  }
+
+  def stop(): Unit = {
+    spritzerSource.stop()
   }
 
-  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable {
+  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable with StoppableFunction {
+
+    var mapper: ObjectMapper = _
 
     var twitProvider: TwitterStreamProvider = _
 
     @throws[Exception]
     override def open(parameters: Configuration): Unit = {
+      mapper = StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT)
       twitProvider = new TwitterStreamProvider( sourceConfig )
       twitProvider.prepare(twitProvider)
       twitProvider.startStream()
@@ -120,17 +133,16 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
       do {
         Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
         iterator = twitProvider.readCurrent().iterator()
-        iterator.toList.map(datum => ctx.collect(datum.getDocument.asInstanceOf[String]))
+        iterator.toList.map(datum => ctx.collect(mapper.writeValueAsString(datum.getDocument)))
       } while( twitProvider.isRunning )
     }
 
     override def cancel(): Unit = {
-      twitProvider.cleanUp()
+      close()
     }
 
-    @throws[Exception]
-    override def close(): Unit = {
-      twitProvider.cleanUp()
+    override def stop(): Unit = {
+      close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 867255d..ad0315a 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -128,7 +128,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
 
     LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
 
-    env.execute("FlinkTwitterUserInformationPipeline")
+    env.execute(STREAMS_ID)
   }
 
   class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
index 259fe7f..1e59039 100644
--- a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
@@ -24,17 +24,17 @@ Example Configuration:
 Run (Local):
 ------------
 
-    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
+    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
 
 Run (Flink):
 ------------
 
-    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> 
+    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline http://<location_of_config_file> 
 
 Run (YARN):
 -----------
 
-    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> 
+    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline http://<location_of_config_file> 
 
 [JavaDocs](apidocs/index.html "JavaDocs")
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md
index 0f15603..24783be 100644
--- a/flink/flink-twitter-collection/src/site/markdown/index.md
+++ b/flink/flink-twitter-collection/src/site/markdown/index.md
@@ -16,11 +16,13 @@ Collects large batches of documents from api.twitter.com from a seed set of ids.
 Streams:
 --------
 
-<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
+<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
 
 <a href="FlinkTwitterPostsPipeline.html" target="_self">FlinkTwitterPostsPipeline</a>
 
-<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
+<a href="FlinkTwitterSpritzerPipeline.html" target="_self">FlinkTwitterSpritzerPipeline</a>
+
+<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
 
 Test:
 -----

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
new file mode 100644
index 0000000..ba5e60d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterFollowingProvider -> source [dir=back,style=dashed];
+  TwitterFollowingProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
new file mode 100644
index 0000000..1092ff4
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterTimelineProvider [label="TwitterTimelineProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterTimelineProvider -> source [dir=back,style=dashed];
+  TwitterTimelineProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
new file mode 100644
index 0000000..5a57595
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //providers
+  TwitterStreamProvider [label="TwitterStreamProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="hdfs://${host}:${port}/${path}/${writerPath}",shape=box];
+
+  //stream
+  TwitterStreamProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
new file mode 100644
index 0000000..4a37234
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterUserInformationProvider [label="TwitterUserInformationProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java"];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterUserInformationProvider -> source [dir=back,style=dashed];
+  TwitterUserInformationProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
index 87057be..3e922ab 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -10,7 +10,11 @@ destination {
   path = "target/test-classes"
   writerPath = "FlinkTwitterFollowingPipelineFollowersIT"
 }
-twitter.endpoint = friends
+twitter {
+  endpoint = followers
+  ids_only = true
+  max_items = 5000
+}
 providerWaitMs = 1000
 local = true
 test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
index b5212ed..038a8dc 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -10,7 +10,10 @@ destination {
   path = "target/test-classes"
   writerPath = "FlinkTwitterFollowingPipelineFriendsIT"
 }
-twitter.endpoint = friends
+twitter {
+  endpoint = friends
+  ids_only = true
+}
 providerWaitMs = 1000
 local = true
 test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
new file mode 100644
index 0000000..fec4769
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
@@ -0,0 +1,15 @@
+destination {
+  fields = ["DOC"]
+  scheme = file
+  path = "target/test-classes"
+  writerPath = "FlinkTwitterSpritzerPipelineIT"
+}
+twitter {
+  endpoint = sample
+  track = [
+    "data"
+  ]
+}
+providerWaitMs = 1000
+local = true
+test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
index 342a850..d3663fe 100644
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -2,7 +2,7 @@ source {
   fields = ["ID"]
   scheme = file
   path = "target/test-classes"
-  readerPath = "asf.txt"
+  readerPath = "1000twitterids.txt"
 }
 destination {
   fields = ["DOC"]

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
new file mode 100644
index 0000000..f38ad92
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -0,0 +1,55 @@
+package org.apache.streams.examples.flink.twitter.test
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+
+/**
+  * Created by sblackmon on 3/13/16.
+  */
+class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFollowersIT])
+
+  import FlinkTwitterFollowingPipeline._
+
+  @Test
+  def flinkTwitterFollowersPipelineFollowersIT = {
+
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
+
+    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+    val jobThread = new Thread(job)
+    jobThread.start
+    jobThread.join
+
+    eventually (timeout(60 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
+      assert(
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+          > 4000)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
new file mode 100644
index 0000000..464e743
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -0,0 +1,59 @@
+package com.peoplepattern.streams.twitter.collection
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.junit.Ignore
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
+
+/**
+  * Created by sblackmon on 3/13/16.
+  */
+class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFriendsIT])
+
+  import FlinkTwitterFollowingPipeline._
+
+  @Test
+  def flinkTwitterFollowersPipelineFriendsIT = {
+
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
+
+    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+    val jobThread = new Thread(job)
+    jobThread.start
+    jobThread.join
+
+    eventually (timeout(60 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
+      assert(
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+          > 90)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
deleted file mode 100644
index e6294f6..0000000
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.peoplepattern.streams.twitter.collection
-
-import java.io.File
-import java.nio.file.{Files, Paths}
-
-import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.slf4j.{Logger, LoggerFactory}
-import org.testng.annotations.Test
-
-import scala.io.Source
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
-import org.scalatest.time.SpanSugar._
-
-/**
-  * Created by sblackmon on 3/13/16.
-  */
-class FlinkTwitterFollowingPipelineIT extends FlatSpec {
-
-  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT])
-
-  import FlinkTwitterFollowingPipeline._
-
-  @Test(enabled = false)
-  def flinkTwitterFollowersPipelineFriendsIT = {
-
-    val reference: Config = ConfigFactory.load()
-    val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
-    assert(conf_file.exists())
-    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
-    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
-    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
-    val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
-
-    setup(testConfig)
-
-    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
-    val jobThread = new Thread(job)
-    jobThread.start
-    jobThread.join
-
-    eventually (timeout(60 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
-      assert(
-        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          > 90)
-    }
-
-  }
-
-  @Test(enabled = false)
-  def flinkTwitterFollowersPipelineFollowersIT = {
-
-    val reference: Config = ConfigFactory.load()
-    val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
-    assert(conf_file.exists())
-    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
-    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
-    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
-    val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
-
-    setup(testConfig)
-
-    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
-    val jobThread = new Thread(job)
-    jobThread.start
-    jobThread.join
-
-    eventually (timeout(60 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
-      assert(
-        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          > 500)
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index f083f65..2e2e9b1 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -26,7 +26,7 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
 
   import FlinkTwitterSpritzerPipeline._
 
-  @Test(enabled = false)
+  @Test
   def flinkTwitterSpritzerPipelineIT = {
 
     val reference: Config = ConfigFactory.load()
@@ -43,13 +43,14 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
     val job = new FlinkTwitterSpritzerPipeline(config = testConfig)
     val jobThread = new Thread(job)
     jobThread.start
-    jobThread.join
+    jobThread.join(30000)
+    job.stop()
 
-    eventually (timeout(30 seconds), interval(1 seconds)) {
+    eventually (timeout(60 seconds), interval(1 seconds)) {
       assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
       assert(
         Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          >= 200)
+          >= 10)
     }
 
   }



[6/6] incubator-streams-examples git commit: Merge branch 'flink'

Posted by sb...@apache.org.
Merge branch 'flink'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/8fe6860f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/8fe6860f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/8fe6860f

Branch: refs/heads/master
Commit: 8fe6860f7e0587f595409914bba61ff6a74cbcf5
Parents: 6e93a8f 9dcdf64
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Sun Oct 9 16:35:21 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Sun Oct 9 16:35:21 2016 -0500

----------------------------------------------------------------------
 flink/flink-twitter-collection/README.md        |    8 +
 flink/flink-twitter-collection/pom.xml          |  473 +++++++++
 .../jsonschema/FlinkBatchConfiguration.json     |   12 +
 .../jsonschema/FlinkStreamingConfiguration.json |   40 +
 .../jsonschema/StreamsFlinkConfiguration.json   |   48 +
 .../TwitterFollowingPipelineConfiguration.json  |   29 +
 .../TwitterPostsPipelineConfiguration.json      |   29 +
 .../TwitterSpritzerPipelineConfiguration.json   |   29 +
 ...terUserInformationPipelineConfiguration.json |   29 +
 .../resources/FlinkTwitterFollowingPipeline.dot |   37 +
 .../FlinkTwitterFollowingPipeline.dot.svg       |   63 ++
 .../resources/FlinkTwitterPostsPipeline.dot     |   37 +
 .../resources/FlinkTwitterPostsPipeline.dot.svg |   63 ++
 .../resources/FlinkTwitterSpritzerPipeline.dot  |   33 +
 .../FlinkTwitterSpritzerPipeline.dot.svg        |   47 +
 .../FlinkTwitterUserInformationPipeline.dot     |   37 +
 .../FlinkTwitterUserInformationPipeline.dot.svg |   63 ++
 .../streams/examples/flink/FlinkBase.scala      |  200 ++++
 .../FlinkTwitterFollowingPipeline.scala         |  156 +++
 .../collection/FlinkTwitterPostsPipeline.scala  |  161 +++
 .../FlinkTwitterSpritzerPipeline.scala          |  150 +++
 .../FlinkTwitterUserInformationPipeline.scala   |  159 +++
 .../markdown/FlinkTwitterFollowingPipeline.md   |   43 +
 .../site/markdown/FlinkTwitterPostsPipeline.md  |   41 +
 .../markdown/FlinkTwitterSpritzerPipeline.md    |   41 +
 .../FlinkTwitterUserInformationPipeline.md      |   41 +
 .../src/site/markdown/index.md                  |   48 +
 .../site/resources/FlinkBatchConfiguration.json |   12 +
 .../resources/FlinkStreamingConfiguration.json  |   40 +
 .../resources/FlinkTwitterFollowingPipeline.dot |   37 +
 .../resources/FlinkTwitterPostsPipeline.dot     |   37 +
 .../resources/FlinkTwitterSpritzerPipeline.dot  |   33 +
 .../FlinkTwitterUserInformationPipeline.dot     |   37 +
 .../resources/StreamsFlinkConfiguration.json    |   48 +
 .../TwitterFollowingBatchConfiguration.json     |   23 +
 .../TwitterFollowingPipelineConfiguration.json  |   29 +
 .../TwitterPostsBatchConfiguration.json         |   23 +
 .../TwitterPostsPipelineConfiguration.json      |   29 +
 .../TwitterSpritzerPipelineConfiguration.json   |   29 +
 ...witterUserInformationBatchConfiguration.json |   23 +
 ...terUserInformationPipelineConfiguration.json |   29 +
 .../src/test/resources/1000twitterids.txt       | 1000 ++++++++++++++++++
 ...linkTwitterFollowingPipelineFollowersIT.conf |   20 +
 .../FlinkTwitterFollowingPipelineFriendsIT.conf |   19 +
 .../resources/FlinkTwitterPostsPipelineIT.conf  |   15 +
 .../FlinkTwitterSpritzerPipelineIT.conf         |   15 +
 .../FlinkTwitterUserInformationPipelineIT.conf  |   15 +
 .../src/test/resources/asf.txt                  |    1 +
 ...inkTwitterFollowingPipelineFollowersIT.scala |   55 +
 ...FlinkTwitterFollowingPipelineFriendsIT.scala |   59 ++
 .../test/FlinkTwitterPostsPipelineIT.scala      |   61 ++
 .../test/FlinkTwitterSpritzerPipelineIT.scala   |   58 +
 .../FlinkTwitterUserInformationPipelineIT.scala |   61 ++
 flink/pom.xml                                   |   44 +
 local/elasticsearch-hdfs/pom.xml                |   14 +-
 local/elasticsearch-reindex/pom.xml             |    2 +-
 local/mongo-elasticsearch-sync/pom.xml          |   12 +-
 local/twitter-follow-graph/pom.xml              |   10 +-
 local/twitter-history-elasticsearch/pom.xml     |   14 +-
 local/twitter-userstream-elasticsearch/pom.xml  |   14 +-
 pom.xml                                         |   29 +-
 61 files changed, 4017 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8fe6860f/pom.xml
----------------------------------------------------------------------


[2/6] incubator-streams-examples git commit: flink example

Posted by sb...@apache.org.
flink example


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/4491cfe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/4491cfe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/4491cfe1

Branch: refs/heads/master
Commit: 4491cfe1d0bf7324073537e89e7e8b6ed8ab43d5
Parents: b3429dd
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Mon Sep 26 12:43:22 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Mon Sep 26 12:43:22 2016 -0500

----------------------------------------------------------------------
 flink/flink-twitter-collection/README.md        |    8 +
 flink/flink-twitter-collection/pom.xml          |  420 ++++++++
 .../jsonschema/FlinkBatchConfiguration.json     |   12 +
 .../jsonschema/FlinkStreamingConfiguration.json |   40 +
 .../jsonschema/StreamsFlinkConfiguration.json   |   48 +
 .../TwitterFollowingPipelineConfiguration.json  |   29 +
 .../TwitterPostsPipelineConfiguration.json      |   29 +
 ...terUserInformationPipelineConfiguration.json |   29 +
 .../streams/examples/flink/FlinkBase.scala      |  200 ++++
 .../FlinkTwitterFollowingPipeline.scala         |  149 +++
 .../collection/FlinkTwitterPostsPipeline.scala  |  165 +++
 .../FlinkTwitterUserInformationPipeline.scala   |  163 +++
 .../markdown/FlinkTwitterFollowingPipeline.md   |   41 +
 .../site/markdown/FlinkTwitterPostsPipeline.md  |   41 +
 .../FlinkTwitterUserInformationPipeline.md      |   41 +
 .../src/site/markdown/index.md                  |   32 +
 .../site/resources/FlinkBatchConfiguration.json |   12 +
 .../resources/FlinkStreamingConfiguration.json  |   40 +
 .../resources/StreamsFlinkConfiguration.json    |   48 +
 .../TwitterFollowingBatchConfiguration.json     |   23 +
 .../TwitterFollowingPipelineConfiguration.json  |   29 +
 .../TwitterPostsBatchConfiguration.json         |   23 +
 .../TwitterPostsPipelineConfiguration.json      |   29 +
 ...witterUserInformationBatchConfiguration.json |   23 +
 ...terUserInformationPipelineConfiguration.json |   29 +
 .../src/test/resources/1000twitterids.txt       | 1000 ++++++++++++++++++
 .../FlinkTwitterFollowingPipeline.conf          |   10 +
 .../resources/FlinkTwitterPostsPipeline.conf    |   10 +
 .../FlinkTwitterUserInformationPipeline.conf    |   10 +
 .../src/test/resources/asf.txt                  |    1 +
 .../test/FlinkTwitterFollowingPipelineIT.scala  |   81 ++
 .../test/FlinkTwitterPostsPipelineIT.scala      |   55 +
 .../FlinkTwitterUserInformationPipelineIT.scala |   56 +
 flink/pom.xml                                   |   47 +
 pom.xml                                         |   29 +-
 35 files changed, 2988 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/README.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/README.md b/flink/flink-twitter-collection/README.md
new file mode 100644
index 0000000..f9fe687
--- /dev/null
+++ b/flink/flink-twitter-collection/README.md
@@ -0,0 +1,8 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+org.apache.streams:flink-twitter-collection
+===========================================
+
+[README.md](src/site/markdown/index.md "README")

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/pom.xml
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
new file mode 100644
index 0000000..33b05fe
--- /dev/null
+++ b/flink/flink-twitter-collection/pom.xml
@@ -0,0 +1,420 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-examples-flink</artifactId>
+        <version>0.4-incubating-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-twitter-collection</artifactId>
+    <name>flink-twitter-collection</name>
+
+    <description>Collects twitter documents using flink.</description>
+
+    <properties>
+        <docker.repo>apachestreams</docker.repo>
+        <hdfs.version>2.7.0</hdfs.version>
+        <flink.version>1.1.2</flink.version>
+        <scala.suffix>2.10</scala.suffix>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-provider-twitter</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-hdfs</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hdfs.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${scala.suffix}</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.suffix}</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-filesystem_2.10</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.suffix}</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-metrics-core</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/scala</sourceDirectory>
+        <testSourceDirectory>src/test/scala</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <configuration>
+                    <filesets>
+                        <fileset>
+                            <directory>data</directory>
+                            <followSymlinks>false</followSymlinks>
+                        </fileset>
+                    </filesets>
+                </configuration>
+            </plugin>
+            <!-- This binary runs with logback -->
+            <!-- Keep log4j out -->
+            <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-enforcer-plugin</artifactId>
+            <version>1.3.1</version>
+            <executions>
+                <execution>
+                    <id>enforce-banned-dependencies</id>
+                    <goals>
+                        <goal>enforce</goal>
+                    </goals>
+                    <configuration>
+                        <rules>
+                            <bannedDependencies>
+                                <excludes>
+                                    <exclude>org.slf4j:slf4j-log4j12</exclude>
+                                    <exclude>org.slf4j:slf4j-jcl</exclude>
+                                    <exclude>org.slf4j:slf4j-jdk14</exclude>
+                                    <exclude>org.log4j:log4j</exclude>
+                                    <exclude>commons-logging:commons-logging</exclude>
+                                </excludes>
+                            </bannedDependencies>
+                        </rules>
+                        <fail>true</fail>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+        </plugin>
+        <plugin>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+            <version>0.4.1</version>
+            <configuration>
+                <addCompileSourceRoot>true</addCompileSourceRoot>
+                <generateBuilders>true</generateBuilders>
+                <sourcePaths>
+                    <sourcePath>src/main/jsonschema</sourcePath>
+                </sourcePaths>
+                <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
+                <useJodaDates>false</useJodaDates>
+            </configuration>
+            <executions>
+                <execution>
+                    <goals>
+                        <goal>generate</goal>
+                    </goals>
+                </execution>
+            </executions>
+        </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <includes>**/*.json</includes>
+                    <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                    <includeGroupIds>org.apache.streams</includeGroupIds>
+                    <includeTypes>test-jar</includeTypes>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test-resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>2.12.4</version>
+                <executions>
+                    <execution>
+                        <id>integration-tests</id>
+                        <goals>
+                            <goal>integration-test</goal>
+                            <goal>verify</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>io.fabric8</groupId>
+                <artifactId>docker-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
new file mode 100644
index 0000000..30a2942
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
@@ -0,0 +1,12 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.FlinkBatchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "StreamsFlinkConfiguration.json"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
new file mode 100644
index 0000000..0d63f4e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
@@ -0,0 +1,40 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.FlinkStreamingConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "StreamsFlinkConfiguration.json"
+  },
+  "properties": {
+    "parallel": {
+      "type": "integer",
+      "default": 1
+    },
+    "providerWaitMs": {
+      "type": "integer",
+      "default": 1000
+    },
+    "checkpointIntervalMs": {
+      "type": "integer",
+      "default": 300000
+    },
+    "checkpointTimeoutMs": {
+      "type": "integer",
+      "default": 30000
+    },
+    "restartAttempts": {
+      "type": "integer",
+      "description": "number of restart attempts",
+      "default": 3
+    },
+    "restartDelayMs": {
+      "type": "integer",
+      "description": "delay in milliseconds",
+      "default": 10000
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
new file mode 100644
index 0000000..ef78357
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
@@ -0,0 +1,48 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.StreamsFlinkConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "http://streams.peoplepattern.com/incubator-streams/0.3.9-PP-SNAPSHOT/streams-config/StreamsConfiguration.json"
+  },
+  "properties": {
+    "parallel": {
+      "type": "integer",
+      "default": 1
+    },
+    "providerWaitMs": {
+      "type": "integer",
+      "default": 1000
+    },
+    "checkpointIntervalMs": {
+      "type": "integer",
+      "default": 300000
+    },
+    "checkpointTimeoutMs": {
+      "type": "integer",
+      "default": 30000
+    },
+    "test": {
+      "type": "boolean",
+      "default": false
+    },
+    "local": {
+      "type": "boolean",
+      "default": true
+    },
+    "restartAttempts": {
+      "type": "integer",
+      "description": "number of restart attempts",
+      "default": 3
+    },
+    "restartDelayMs": {
+      "type": "integer",
+      "description": "delay in milliseconds",
+      "default": 10000
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
new file mode 100644
index 0000000..de4f9bb
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
new file mode 100644
index 0000000..628d7ee
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
new file mode 100644
index 0000000..5261748
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
new file mode 100644
index 0000000..1f1ed6d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -0,0 +1,200 @@
+package org.apache.streams.examples.flink
+
+import java.net.MalformedURLException
+
+import com.google.common.base.Strings
+import com.typesafe.config.Config
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.flink.{FlinkBatchConfiguration, FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.slf4j.LoggerFactory
+
+trait FlinkBase {
+
+  private val BASELOGGER = LoggerFactory.getLogger("FlinkBase")
+  private val MAPPER = StreamsJacksonMapper.getInstance()
+
+  var configUrl : String = _
+  var typesafe : Config = _
+  var streamsConfig = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig)
+  var streamsFlinkConfiguration: StreamsFlinkConfiguration = _
+
+  var executionEnvironment: ExecutionEnvironment = _
+  var streamExecutionEnvironment: StreamExecutionEnvironment = _
+
+  /*
+   Basic stuff for every flink job
+   */
+  def main(args: Array[String]): Unit = {
+    // if only one argument, use it as the config URL
+    if( args.size > 0 ) {
+      BASELOGGER.info("Args: {}", args)
+      configUrl = args(0)
+      setup(configUrl)
+    }
+
+  }
+
+  def setup(configUrl : String): Boolean =  {
+    BASELOGGER.info("StreamsConfigurator.config: {}", StreamsConfigurator.config)
+    if( !Strings.isNullOrEmpty(configUrl)) {
+      BASELOGGER.info("StreamsConfigurator.resolveConfig(configUrl): {}", StreamsConfigurator.resolveConfig(configUrl))
+      try {
+        typesafe = StreamsConfigurator.resolveConfig(configUrl).withFallback(StreamsConfigurator.config).resolve()
+      } catch {
+        case mue: MalformedURLException => {
+          BASELOGGER.error("Invalid Configuration URL: ", mue)
+          return false
+        }
+        case e: Exception => {
+          BASELOGGER.error("Invalid Configuration URL: ", e)
+          return false
+        }
+      }
+    }
+    else {
+      typesafe = StreamsConfigurator.getConfig
+    }
+
+    return setup(typesafe)
+
+  }
+
+  def setup(typesafe : Config): Boolean =  {
+    this.typesafe = typesafe
+
+    BASELOGGER.info("Typesafe Config: {}", typesafe)
+
+    if( this.typesafe.getString("mode").equals("streaming")) {
+      val streamingConfiguration: FlinkStreamingConfiguration =
+        new ComponentConfigurator[FlinkStreamingConfiguration](classOf[FlinkStreamingConfiguration]).detectConfiguration(typesafe)
+      return setupStreaming(streamingConfiguration)
+    } else if( this.typesafe.getString("mode").equals("batch")) {
+      val batchConfiguration: FlinkBatchConfiguration =
+        new ComponentConfigurator[FlinkBatchConfiguration](classOf[FlinkBatchConfiguration]).detectConfiguration(typesafe)
+      return setupBatch(batchConfiguration)
+    } else {
+      return false;
+    }
+  }
+
+//  def setup(typesafe: Config): Boolean =  {
+//
+//    val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
+//
+//    this.streamsConfig = streamsConfig
+//
+//    BASELOGGER.info("Streams Config: " + streamsConfig)
+//
+//    setup(streamsConfig)
+//  }
+
+  def setupStreaming(streamingConfiguration: FlinkStreamingConfiguration): Boolean = {
+
+    BASELOGGER.info("FsStreamingFlinkConfiguration: " + streamingConfiguration)
+
+    this.streamsFlinkConfiguration = streamingConfiguration
+
+    if( streamsFlinkConfiguration == null) return false
+
+    if( streamExecutionEnvironment == null )
+      streamExecutionEnvironment = streamEnvironment(streamingConfiguration)
+
+    return false
+
+  }
+
+  def setupBatch(batchConfiguration: FlinkBatchConfiguration): Boolean =  {
+
+    BASELOGGER.info("FsBatchFlinkConfiguration: " + batchConfiguration)
+
+    this.streamsFlinkConfiguration = batchConfiguration
+
+    if( streamsFlinkConfiguration == null) return false
+
+    if( executionEnvironment == null )
+      executionEnvironment = batchEnvironment(batchConfiguration)
+
+    return true
+
+  }
+
+  def batchEnvironment(config: FlinkBatchConfiguration = new FlinkBatchConfiguration()) : ExecutionEnvironment = {
+    if (config.getTest == false && config.getLocal == false) {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      return env
+    } else {
+      val env = ExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
+      return env
+    }
+  }
+
+  def streamEnvironment(config: FlinkStreamingConfiguration = new FlinkStreamingConfiguration()) : StreamExecutionEnvironment = {
+    if( config.getTest == false && config.getLocal == false) {
+      val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+      env.setRestartStrategy(RestartStrategies.noRestart());
+
+      // start a checkpoint every hour
+      env.enableCheckpointing(config.getCheckpointIntervalMs)
+
+      env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+
+      // checkpoints have to complete within five minutes, or are discarded
+      env.getCheckpointConfig.setCheckpointTimeout(config.getCheckpointTimeoutMs)
+
+      // allow only one checkpoint to be in progress at the same time
+      env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+
+      return env
+    }
+
+    else return StreamExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
+  }
+
+  def buildReaderPath(configObject: HdfsReaderConfiguration) : String = {
+    var inPathBuilder : String = ""
+    if (configObject.getScheme.equals(HdfsConfiguration.Scheme.FILE)) {
+      inPathBuilder = configObject.getPath + "/" + configObject.getReaderPath
+    }
+    else if (configObject.getScheme.equals(HdfsConfiguration.Scheme.HDFS)) {
+      inPathBuilder = configObject.getScheme + "://" + configObject.getHost + ":" + configObject.getPort + "/" + configObject.getPath + "/" + configObject.getReaderPath
+    }
+    else if (configObject.getScheme.toString.equals("s3")) {
+      inPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getReaderPath
+    } else {
+      throw new Exception("scheme not recognized: " + configObject.getScheme)
+    }
+    return inPathBuilder
+  }
+
+  def buildWriterPath(configObject: HdfsWriterConfiguration) : String = {
+    var outPathBuilder : String = ""
+    if( configObject.getScheme.equals(HdfsConfiguration.Scheme.FILE)) {
+      outPathBuilder = configObject.getPath + "/" + configObject.getWriterPath
+    }
+    else if( configObject.getScheme.equals(HdfsConfiguration.Scheme.HDFS)) {
+      outPathBuilder = configObject.getScheme + "://" + configObject.getHost + ":" + configObject.getPort + "/" + configObject.getPath + "/" + configObject.getWriterPath
+    }
+    else if( configObject.getScheme.toString.equals("s3")) {
+      outPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getWriterPath
+    } else {
+      throw new Exception("output scheme not recognized: " + configObject.getScheme)
+    }
+    return outPathBuilder
+  }
+
+  def toProviderId(input : String) : String = {
+    if( input.startsWith("@") )
+      return input.substring(1)
+    if( input.contains(':'))
+      return input.substring(input.lastIndexOf(':')+1)
+    else return input
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
new file mode 100644
index 0000000..2ac7d32
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -0,0 +1,149 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterFollowingConfiguration
+import org.apache.streams.twitter.pojo.Follow
+import org.apache.streams.twitter.provider.TwitterFollowingProvider
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Created by sblackmon on 4/20/16.
+ */
+/**
+ * Created by sblackmon on 3/15/16.
+ */
+object FlinkTwitterFollowingPipeline extends FlinkBase {
+
+    val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
+
+    private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+    private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+    override def main(args: Array[String]) = {
+    super.main(args)
+    val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+    if( setup(jobConfig) == false ) System.exit(1)
+    val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig)
+    val thread = new Thread(pipeline)
+    thread.start()
+    thread.join()
+    }
+
+    def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean =  {
+
+        LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+
+        if( jobConfig == null ) {
+            LOGGER.error("jobConfig is null!")
+            System.err.println("jobConfig is null!")
+            return false
+        }
+
+        if( jobConfig.getSource == null ) {
+            LOGGER.error("jobConfig.getSource is null!")
+            System.err.println("jobConfig.getSource is null!")
+            return false
+        }
+
+        if( jobConfig.getDestination == null ) {
+            LOGGER.error("jobConfig.getDestination is null!")
+            System.err.println("jobConfig.getDestination is null!")
+            return false
+        }
+
+        if( jobConfig.getTwitter == null ) {
+            LOGGER.error("jobConfig.getTwitter is null!")
+            System.err.println("jobConfig.getTwitter is null!")
+            return false
+        }
+
+        return true
+
+    }
+
+}
+
+class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+    import FlinkTwitterFollowingPipeline._
+
+    override def run(): Unit = {
+
+        val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+        env.setNumberOfExecutionRetries(0)
+
+        val inPath = buildReaderPath(config.getSource)
+
+        val outPath = buildWriterPath(config.getDestination)
+
+        val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
+
+        // these datums contain 'Follow' objects
+        val followDatums: DataStream[StreamsDatum] =
+            keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
+
+        val follows: DataStream[Follow] = followDatums
+          .map(datum => datum.getDocument.asInstanceOf[Follow])
+
+        val jsons: DataStream[String] = follows
+          .map(follow => {
+              val MAPPER = StreamsJacksonMapper.getInstance
+              MAPPER.writeValueAsString(follow)
+          })
+
+        if( config.getTest == false )
+            jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
+        else
+            jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+              .setParallelism(env.getParallelism);
+
+        // if( test == true ) jsons.print();
+
+        env.execute("FlinkTwitterFollowingPipeline")
+    }
+
+    class FollowingCollectorFlatMapFunction(
+                                             twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
+                                             flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+                                           ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+
+        override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+            collectConnections(input, out)
+        }
+
+        def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+            val twitProvider: TwitterFollowingProvider =
+                new TwitterFollowingProvider(
+                    twitterConfiguration.withIdsOnly(true).withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
+                )
+            twitProvider.prepare(twitProvider)
+            twitProvider.startStream()
+            var iterator: Iterator[StreamsDatum] = null
+            do {
+                Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+                twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+            } while( twitProvider.isRunning )
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
new file mode 100644
index 0000000..f8e221c
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -0,0 +1,165 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.util.concurrent.Uninterruptibles
+import com.peoplepattern.streams.pdb.pipelines.FlinkStreamingConfiguration
+import com.peoplepattern.streams.pdb.flink.{FlinkBase, FlinkUtil}
+import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
+import com.peoplepattern.streams.twitter.collection.FlinkTwitterPostsPipeline.LOGGER
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.hdfs.HdfsConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.pojo.{Tweet, User}
+import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Created by sblackmon on 7/29/15.
+  */
+object FlinkTwitterPostsPipeline extends FlinkBase {
+
+  val STREAMS_ID: String = "FlinkTwitterPostsPipeline"
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipeline])
+  private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+  override def main(args: Array[String]) = {
+    super.main(args)
+    val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
+    if( setup(jobConfig) == false ) System.exit(1)
+    val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig)
+    val thread = new Thread(pipeline)
+    thread.start()
+    thread.join()
+  }
+
+  def setup(jobConfig: TwitterPostsPipelineConfiguration): Boolean =  {
+
+    LOGGER.info("TwitterPostsPipelineConfiguration: " + jobConfig)
+
+    if( jobConfig == null ) {
+      LOGGER.error("jobConfig is null!")
+      System.err.println("jobConfig is null!")
+      return false
+    }
+
+    if( jobConfig.getSource == null ) {
+      LOGGER.error("jobConfig.getSource is null!")
+      System.err.println("jobConfig.getSource is null!")
+      return false
+    }
+
+    if( jobConfig.getDestination == null ) {
+      LOGGER.error("jobConfig.getDestination is null!")
+      System.err.println("jobConfig.getDestination is null!")
+      return false
+    }
+
+    if( jobConfig.getTwitter == null ) {
+      LOGGER.error("jobConfig.getTwitter is null!")
+      System.err.println("jobConfig.getTwitter is null!")
+      return false
+    }
+
+    return true
+
+  }
+
+}
+
+class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+  import FlinkTwitterPostsPipeline._
+
+  override def run(): Unit = {
+
+    val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+    env.setNumberOfExecutionRetries(0)
+
+    val inPath = buildReaderPath(config.getSource)
+
+    val outPath = buildWriterPath(config.getDestination)
+
+    //val inProps = buildKafkaProps(config.getSourceTopic)
+
+    val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+
+    //val idTopicIn = new KafkaSink()
+
+//    val idTopicOut : DataStream[String] = env.addSource[String](
+//      new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09(config.getSourceTopic.getTopic, new SimpleStringSchema(),
+//        inProps));
+
+    val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
+
+    // these datums contain 'Tweet' objects
+    val tweetDatums: DataStream[StreamsDatum] =
+      keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(10).name("tweetDatums")
+
+    val tweets: DataStream[Tweet] = tweetDatums
+      .map(datum => datum.getDocument.asInstanceOf[Tweet]).name("tweets")
+
+    val jsons: DataStream[String] = tweets
+      .map(tweet => {
+        val MAPPER = StreamsJacksonMapper.getInstance
+        MAPPER.writeValueAsString(tweet)
+      }).name("json")
+
+    if( config.getTest == false )
+      jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+    else
+      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+        .setParallelism(env.getParallelism);
+
+    // if( test == true ) jsons.print();
+
+    env.execute("FlinkTwitterPostsPipeline")
+  }
+
+  class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+    override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+        collectPosts(input, out)
+    }
+    def collectPosts(id : String, out : Collector[StreamsDatum]) = {
+      val twitterConfiguration = config.getTwitter
+      val twitProvider: TwitterTimelineProvider =
+        new TwitterTimelineProvider(
+          twitterConfiguration.withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(200l)
+        )
+      twitProvider.prepare(twitProvider)
+      twitProvider.startStream()
+      var iterator: Iterator[StreamsDatum] = null
+      do {
+        Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+        twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+      } while( twitProvider.isRunning )
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
new file mode 100644
index 0000000..a081c74
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -0,0 +1,163 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.lang
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
+
+import scala.collection.JavaConversions._
+import com.peoplepattern.streams.twitter.collection.FlinkTwitterUserInformationPipeline.LOGGER
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers._
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.hdfs.HdfsConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.pojo.{Tweet, User}
+import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+  * Created by sblackmon on 3/15/16.
+  */
+object FlinkTwitterUserInformationPipeline extends FlinkBase {
+
+  val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline"
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+  private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+  override def main(args: Array[String]) = {
+    super.main(args)
+    val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
+    if( setup(jobConfig) == false ) System.exit(1)
+    val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig)
+    val thread = new Thread(pipeline)
+    thread.start()
+    thread.join()
+  }
+
+  def setup(jobConfig: TwitterUserInformationPipelineConfiguration): Boolean =  {
+
+    LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+
+    if( jobConfig == null ) {
+      LOGGER.error("jobConfig is null!")
+      System.err.println("jobConfig is null!")
+      return false
+    }
+
+    if( jobConfig.getSource == null ) {
+      LOGGER.error("jobConfig.getSource is null!")
+      System.err.println("jobConfig.getSource is null!")
+      return false
+    }
+
+    if( jobConfig.getDestination == null ) {
+      LOGGER.error("jobConfig.getDestination is null!")
+      System.err.println("jobConfig.getDestination is null!")
+      return false
+    }
+
+    if( jobConfig.getTwitter == null ) {
+      LOGGER.error("jobConfig.getTwitter is null!")
+      System.err.println("jobConfig.getTwitter is null!")
+      return false
+    }
+
+    return true
+
+  }
+
+}
+
+class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipelineConfiguration = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+  import FlinkTwitterUserInformationPipeline._
+
+  override def run(): Unit = {
+
+    val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+    env.setNumberOfExecutionRetries(0)
+
+    val inPath = buildReaderPath(config.getSource)
+
+    val outPath = buildWriterPath(config.getDestination)
+
+    val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+
+    val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
+
+    val idWindows: WindowedStream[String, Int, GlobalWindow] = keyed_ids.countWindow(100)
+
+    val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists")
+
+    val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(10).name("userDatums")
+
+    val user: DataStream[User] = userDatums.map(datum => datum.getDocument.asInstanceOf[User]).name("users")
+
+    val jsons: DataStream[String] = user
+      .map(user => {
+        val MAPPER = StreamsJacksonMapper.getInstance
+        MAPPER.writeValueAsString(user)
+      }).name("jsons")
+
+    if( config.getTest == false )
+      jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+    else
+      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+        .setParallelism(env.getParallelism);
+
+    LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
+
+    env.execute("FlinkTwitterUserInformationPipeline")
+  }
+
+  class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
+    override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 )
+        out.collect(input.map(id => FlinkUtil.toProviderId(id)).toList)
+    }
+  }
+
+  class profileCollectorFlatMapFunction extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
+    override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = {
+      collectProfiles(input, out)
+    }
+    def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = {
+      val twitterConfiguration = config.getTwitter
+      val twitProvider: TwitterUserInformationProvider =
+        new TwitterUserInformationProvider(
+          twitterConfiguration.withInfo(ids)
+        )
+      twitProvider.prepare(twitProvider)
+      twitProvider.startStream()
+      var iterator: Iterator[StreamsDatum] = null
+      do {
+        Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+        twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+      } while( twitProvider.isRunning )
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
new file mode 100644
index 0000000..22f30f5
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterFollowingPipeline
+=============================
+
+Description:
+-----------------
+
+Collects twitter friends or followers with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterFollowingPipeline.dot](FlinkTwitterFollowingPipeline.dot "FlinkTwitterFollowingPipeline.dot" )
+
+Diagram:
+-----------------
+
+![FlinkTwitterFollowingPipeline.dot.svg](./FlinkTwitterFollowingPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterFollowingPipeline.json](FlinkTwitterFollowingPipeline.json "FlinkTwitterFollowingPipeline.json" )
+
+Run (Local):
+------------
+
+    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+
+Run (Flink):
+------------
+
+    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline http://<location_of_config_file> 
+
+Run (YARN):
+-----------
+
+    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline http://<location_of_config_file> 
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
new file mode 100644
index 0000000..5f77994
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterPostsPipeline
+=========================
+
+Description:
+-----------------
+
+Collects twitter posts with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterPostsPipeline.dot](FlinkTwitterPostsPipeline.dot "FlinkTwitterPostsPipeline.dot" )
+
+Diagram:
+-----------------
+
+![FlinkTwitterPostsPipeline.dot.svg](./FlinkTwitterPostsPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterPostsPipeline.json](FlinkTwitterPostsPipeline.json "FlinkTwitterPostsPipeline.json" )
+
+Run (Local):
+------------
+
+    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
+
+Run (Flink):
+------------
+
+    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> 
+
+Run (YARN):
+-----------
+
+    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> 
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
new file mode 100644
index 0000000..5e0d1fe
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterUserInformationPipeline
+===================================
+
+Description:
+-----------------
+
+Collects twitter users with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterUserInformationPipeline.dot](FlinkTwitterUserInformationPipeline.dot "FlinkTwitterUserInformationPipeline.dot" )
+
+Diagram:
+-----------------
+
+![TwitterUserInformationPipeline.dot.svg](./TwitterUserInformationPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterUserInformationPipeline.json](FlinkTwitterUserInformationPipeline.json "FlinkTwitterUserInformationPipeline.json" )
+
+Run (Local):
+------------
+
+    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
+
+Run (Flink):
+------------
+
+    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline http://<location_of_config_file> 
+
+Run (YARN):
+-----------
+
+    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline http://<location_of_config_file> 
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md
new file mode 100644
index 0000000..19e44cf
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/index.md
@@ -0,0 +1,32 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+flink-twitter-collection
+========================
+
+Requirements:
+-------------
+ - Authorized Twitter API credentials
+
+Description:
+------------
+Collects large batches of documents from api.twitter.com from a seed set of ids.
+
+Streams:
+--------
+
+<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
+
+<a href="FlinkTwitterPostsPipeline.html" target="_self">FlinkTwitterPostsPipeline</a>
+
+<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
+
+Build:
+---------
+
+    mvn clean install verify
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
new file mode 100644
index 0000000..30a2942
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
@@ -0,0 +1,12 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.FlinkBatchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "StreamsFlinkConfiguration.json"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json b/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
new file mode 100644
index 0000000..0d63f4e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
@@ -0,0 +1,40 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.FlinkStreamingConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "StreamsFlinkConfiguration.json"
+  },
+  "properties": {
+    "parallel": {
+      "type": "integer",
+      "default": 1
+    },
+    "providerWaitMs": {
+      "type": "integer",
+      "default": 1000
+    },
+    "checkpointIntervalMs": {
+      "type": "integer",
+      "default": 300000
+    },
+    "checkpointTimeoutMs": {
+      "type": "integer",
+      "default": 30000
+    },
+    "restartAttempts": {
+      "type": "integer",
+      "description": "number of restart attempts",
+      "default": 3
+    },
+    "restartDelayMs": {
+      "type": "integer",
+      "description": "delay in milliseconds",
+      "default": 10000
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json b/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
new file mode 100644
index 0000000..ef78357
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
@@ -0,0 +1,48 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.StreamsFlinkConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "http://streams.peoplepattern.com/incubator-streams/0.3.9-PP-SNAPSHOT/streams-config/StreamsConfiguration.json"
+  },
+  "properties": {
+    "parallel": {
+      "type": "integer",
+      "default": 1
+    },
+    "providerWaitMs": {
+      "type": "integer",
+      "default": 1000
+    },
+    "checkpointIntervalMs": {
+      "type": "integer",
+      "default": 300000
+    },
+    "checkpointTimeoutMs": {
+      "type": "integer",
+      "default": 30000
+    },
+    "test": {
+      "type": "boolean",
+      "default": false
+    },
+    "local": {
+      "type": "boolean",
+      "default": true
+    },
+    "restartAttempts": {
+      "type": "integer",
+      "description": "number of restart attempts",
+      "default": 3
+    },
+    "restartDelayMs": {
+      "type": "integer",
+      "description": "delay in milliseconds",
+      "default": 10000
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
new file mode 100644
index 0000000..33afb29
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterFollowingBatchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+    },
+    "hdfs": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "s3": {
+      "type": "object",
+      "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
new file mode 100644
index 0000000..de4f9bb
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
new file mode 100644
index 0000000..376bb4d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterPostsBatchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "hdfs": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "s3": {
+      "type": "object",
+      "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
new file mode 100644
index 0000000..628d7ee
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
new file mode 100644
index 0000000..55f9fbd
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterUserInformationBatchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "hdfs": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "s3": {
+      "type": "object",
+      "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
new file mode 100644
index 0000000..5261748
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file